feat: implement verbose mode (+OK after commands)
When a client sends CONNECT {"verbose":true}, the server now responds
with +OK\r\n after successfully processing CONNECT, PING, SUB, UNSUB,
and PUB/HPUB commands, matching the Go NATS server behavior.
This commit is contained in:
@@ -235,6 +235,8 @@ public sealed class NatsClient : IDisposable
|
||||
{
|
||||
Interlocked.Exchange(ref _lastActivityTicks, DateTime.UtcNow.Ticks);
|
||||
ProcessPub(cmd, ref localInMsgs, ref localInBytes);
|
||||
if (ClientOpts?.Verbose == true)
|
||||
WriteProtocol(NatsProtocol.OkBytes);
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -292,10 +294,14 @@ public sealed class NatsClient : IDisposable
|
||||
{
|
||||
case CommandType.Connect:
|
||||
await ProcessConnectAsync(cmd);
|
||||
if (ClientOpts?.Verbose == true)
|
||||
WriteProtocol(NatsProtocol.OkBytes);
|
||||
break;
|
||||
|
||||
case CommandType.Ping:
|
||||
WriteProtocol(NatsProtocol.PongBytes);
|
||||
if (ClientOpts?.Verbose == true)
|
||||
WriteProtocol(NatsProtocol.OkBytes);
|
||||
break;
|
||||
|
||||
case CommandType.Pong:
|
||||
@@ -304,10 +310,14 @@ public sealed class NatsClient : IDisposable
|
||||
|
||||
case CommandType.Sub:
|
||||
ProcessSub(cmd);
|
||||
if (ClientOpts?.Verbose == true)
|
||||
WriteProtocol(NatsProtocol.OkBytes);
|
||||
break;
|
||||
|
||||
case CommandType.Unsub:
|
||||
ProcessUnsub(cmd);
|
||||
if (ClientOpts?.Verbose == true)
|
||||
WriteProtocol(NatsProtocol.OkBytes);
|
||||
break;
|
||||
|
||||
case CommandType.Pub:
|
||||
|
||||
137
tests/NATS.Server.Tests/VerboseModeTests.cs
Normal file
137
tests/NATS.Server.Tests/VerboseModeTests.cs
Normal file
@@ -0,0 +1,137 @@
|
||||
using System.Net;
|
||||
using System.Net.Sockets;
|
||||
using System.Text;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using NATS.Server;
|
||||
|
||||
namespace NATS.Server.Tests;
|
||||
|
||||
public class VerboseModeTests : IAsyncLifetime
|
||||
{
|
||||
private readonly NatsServer _server;
|
||||
private readonly int _port;
|
||||
private readonly CancellationTokenSource _cts = new();
|
||||
|
||||
public VerboseModeTests()
|
||||
{
|
||||
_port = GetFreePort();
|
||||
_server = new NatsServer(new NatsOptions { Port = _port }, NullLoggerFactory.Instance);
|
||||
}
|
||||
|
||||
public async Task InitializeAsync()
|
||||
{
|
||||
_ = _server.StartAsync(_cts.Token);
|
||||
await _server.WaitForReadyAsync();
|
||||
}
|
||||
|
||||
public async Task DisposeAsync()
|
||||
{
|
||||
await _cts.CancelAsync();
|
||||
_server.Dispose();
|
||||
}
|
||||
|
||||
private static int GetFreePort()
|
||||
{
|
||||
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||
sock.Bind(new IPEndPoint(IPAddress.Loopback, 0));
|
||||
return ((IPEndPoint)sock.LocalEndPoint!).Port;
|
||||
}
|
||||
|
||||
private async Task<Socket> ConnectClientAsync()
|
||||
{
|
||||
var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
|
||||
await sock.ConnectAsync(IPAddress.Loopback, _port);
|
||||
return sock;
|
||||
}
|
||||
|
||||
private static async Task<string> ReadUntilAsync(Socket sock, string expected, int timeoutMs = 5000)
|
||||
{
|
||||
using var cts = new CancellationTokenSource(timeoutMs);
|
||||
var sb = new StringBuilder();
|
||||
var buf = new byte[4096];
|
||||
while (!sb.ToString().Contains(expected))
|
||||
{
|
||||
var n = await sock.ReceiveAsync(buf, SocketFlags.None, cts.Token);
|
||||
if (n == 0) break;
|
||||
sb.Append(Encoding.ASCII.GetString(buf, 0, n));
|
||||
}
|
||||
return sb.ToString();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Verbose_mode_sends_OK_after_CONNECT()
|
||||
{
|
||||
using var client = await ConnectClientAsync();
|
||||
|
||||
// Read INFO
|
||||
await ReadUntilAsync(client, "\r\n");
|
||||
|
||||
// Send CONNECT with verbose:true
|
||||
await client.SendAsync(Encoding.ASCII.GetBytes("CONNECT {\"verbose\":true}\r\n"));
|
||||
|
||||
// Should receive +OK after CONNECT
|
||||
var response = await ReadUntilAsync(client, "+OK\r\n");
|
||||
response.ShouldContain("+OK\r\n");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Verbose_mode_sends_OK_after_SUB()
|
||||
{
|
||||
using var client = await ConnectClientAsync();
|
||||
|
||||
// Read INFO
|
||||
await ReadUntilAsync(client, "\r\n");
|
||||
|
||||
// Send CONNECT with verbose:true, then SUB
|
||||
await client.SendAsync(Encoding.ASCII.GetBytes("CONNECT {\"verbose\":true}\r\n"));
|
||||
|
||||
// Read +OK from CONNECT
|
||||
await ReadUntilAsync(client, "+OK\r\n");
|
||||
|
||||
// Send SUB
|
||||
await client.SendAsync(Encoding.ASCII.GetBytes("SUB foo 1\r\n"));
|
||||
|
||||
// Should receive +OK after SUB
|
||||
var response = await ReadUntilAsync(client, "+OK\r\n");
|
||||
response.ShouldContain("+OK\r\n");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Verbose_mode_sends_OK_after_PUB()
|
||||
{
|
||||
using var client = await ConnectClientAsync();
|
||||
|
||||
// Read INFO
|
||||
await ReadUntilAsync(client, "\r\n");
|
||||
|
||||
// Send CONNECT with verbose:true
|
||||
await client.SendAsync(Encoding.ASCII.GetBytes("CONNECT {\"verbose\":true}\r\n"));
|
||||
|
||||
// Read +OK from CONNECT
|
||||
await ReadUntilAsync(client, "+OK\r\n");
|
||||
|
||||
// Send PUB
|
||||
await client.SendAsync(Encoding.ASCII.GetBytes("PUB foo 5\r\nHello\r\n"));
|
||||
|
||||
// Should receive +OK after PUB
|
||||
var response = await ReadUntilAsync(client, "+OK\r\n");
|
||||
response.ShouldContain("+OK\r\n");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Non_verbose_mode_does_not_send_OK()
|
||||
{
|
||||
using var client = await ConnectClientAsync();
|
||||
|
||||
// Read INFO
|
||||
await ReadUntilAsync(client, "\r\n");
|
||||
|
||||
// Send CONNECT without verbose (default false), then SUB, then PING
|
||||
await client.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nSUB foo 1\r\nPING\r\n"));
|
||||
|
||||
// Should receive PONG but NOT +OK
|
||||
var response = await ReadUntilAsync(client, "PONG\r\n");
|
||||
response.ShouldContain("PONG\r\n");
|
||||
response.ShouldNotContain("+OK");
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user