feat: add -ERR response infrastructure with SendErrAsync and SendErrAndCloseAsync

This commit is contained in:
Joseph Doherty
2026-02-22 21:37:32 -05:00
parent 8ee5a7f97b
commit 19e8c65f6d
3 changed files with 95 additions and 3 deletions

View File

@@ -29,6 +29,7 @@ public sealed class NatsClient : IDisposable
private readonly ServerInfo _serverInfo;
private readonly NatsParser _parser;
private readonly SemaphoreSlim _writeLock = new(1, 1);
private CancellationTokenSource? _clientCts;
private readonly Dictionary<string, Subscription> _subs = new();
private readonly ILogger _logger;
@@ -58,15 +59,16 @@ public sealed class NatsClient : IDisposable
public async Task RunAsync(CancellationToken ct)
{
_clientCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
var pipe = new Pipe();
try
{
// Send INFO
await SendInfoAsync(ct);
await SendInfoAsync(_clientCts.Token);
// Start read pump and command processing in parallel
var fillTask = FillPipeAsync(pipe.Writer, ct);
var processTask = ProcessCommandsAsync(pipe.Reader, ct);
var fillTask = FillPipeAsync(pipe.Writer, _clientCts.Token);
var processTask = ProcessCommandsAsync(pipe.Reader, _clientCts.Token);
await Task.WhenAny(fillTask, processTask);
}
@@ -81,6 +83,14 @@ public sealed class NatsClient : IDisposable
finally
{
Router?.RemoveClient(this);
try
{
_socket.Shutdown(SocketShutdown.Both);
}
catch (Exception ex)
{
_logger.LogDebug(ex, "Client {ClientId} socket shutdown error", Id);
}
}
}
@@ -282,6 +292,36 @@ public sealed class NatsClient : IDisposable
}
}
public async Task SendErrAsync(string message)
{
var errLine = Encoding.ASCII.GetBytes($"-ERR '{message}'\r\n");
try
{
await WriteAsync(errLine, _clientCts?.Token ?? CancellationToken.None);
}
catch (OperationCanceledException)
{
// Expected during shutdown
}
catch (IOException ex)
{
_logger.LogDebug(ex, "Client {ClientId} failed to send -ERR", Id);
}
catch (ObjectDisposedException ex)
{
_logger.LogDebug(ex, "Client {ClientId} failed to send -ERR (disposed)", Id);
}
}
public async Task SendErrAndCloseAsync(string message)
{
await SendErrAsync(message);
if (_clientCts is { } cts)
await cts.CancelAsync();
else
_socket.Close();
}
public void RemoveAllSubscriptions(SubList subList)
{
foreach (var sub in _subs.Values)
@@ -291,6 +331,7 @@ public sealed class NatsClient : IDisposable
public void Dispose()
{
_clientCts?.Dispose();
_stream.Dispose();
_socket.Dispose();
_writeLock.Dispose();

View File

@@ -19,6 +19,13 @@ public static class NatsProtocol
public static readonly byte[] MsgPrefix = "MSG "u8.ToArray();
public static readonly byte[] HmsgPrefix = "HMSG "u8.ToArray();
public static readonly byte[] ErrPrefix = "-ERR "u8.ToArray();
// Standard error messages (matching Go server)
public const string ErrMaxConnectionsExceeded = "maximum connections exceeded";
public const string ErrStaleConnection = "Stale Connection";
public const string ErrMaxPayloadViolation = "Maximum Payload Violation";
public const string ErrInvalidPublishSubject = "Invalid Publish Subject";
public const string ErrInvalidSubject = "Invalid Subject";
}
public sealed class ServerInfo

View File

@@ -86,4 +86,48 @@ public class ClientTests : IAsyncDisposable
await _cts.CancelAsync();
}
[Fact]
public async Task Client_SendErrAsync_writes_correct_wire_format()
{
var runTask = _natsClient.RunAsync(_cts.Token);
// Read INFO first
var buf = new byte[4096];
using var readCts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
await _clientSocket.ReceiveAsync(buf, SocketFlags.None, readCts.Token);
// Trigger SendErrAsync
await _natsClient.SendErrAsync("Invalid Subject");
var n = await _clientSocket.ReceiveAsync(buf, SocketFlags.None, readCts.Token);
var response = Encoding.ASCII.GetString(buf, 0, n);
response.ShouldBe("-ERR 'Invalid Subject'\r\n");
await _cts.CancelAsync();
}
[Fact]
public async Task Client_SendErrAndCloseAsync_sends_error_then_disconnects()
{
var runTask = _natsClient.RunAsync(_cts.Token);
// Read INFO first
var buf = new byte[4096];
using var readCts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
await _clientSocket.ReceiveAsync(buf, SocketFlags.None, readCts.Token);
// Trigger SendErrAndCloseAsync
await _natsClient.SendErrAndCloseAsync("maximum connections exceeded");
var n = await _clientSocket.ReceiveAsync(buf, SocketFlags.None, readCts.Token);
var response = Encoding.ASCII.GetString(buf, 0, n);
response.ShouldBe("-ERR 'maximum connections exceeded'\r\n");
// Connection should be closed — next read returns 0
n = await _clientSocket.ReceiveAsync(buf, SocketFlags.None, readCts.Token);
n.ShouldBe(0);
}
}