feat: implement NATS protocol parser with System.IO.Pipelines
Add NatsParser that reads NATS protocol commands from ReadOnlySequence<byte>. Identifies commands by first 2 bytes using case-insensitive bit masking. Handles PUB/HPUB payload reading with stateful _awaitingPayload for split-packet scenarios. Uses Span<Range> for zero-allocation argument splitting and ParseSize for ASCII decimal parsing. Includes CommandType enum, ParsedCommand struct, and ProtocolViolationException. 14 tests covering PING, PONG, CONNECT, INFO, SUB (with/without queue), UNSUB (with/without max), PUB (with/without reply, zero payload), HPUB, multiple commands, and case insensitivity.
This commit is contained in:
177
tests/NATS.Server.Tests/ParserTests.cs
Normal file
177
tests/NATS.Server.Tests/ParserTests.cs
Normal file
@@ -0,0 +1,177 @@
|
||||
using System.Buffers;
|
||||
using System.IO.Pipelines;
|
||||
using System.Text;
|
||||
using NATS.Server.Protocol;
|
||||
|
||||
namespace NATS.Server.Tests;
|
||||
|
||||
public class ParserTests
|
||||
{
|
||||
private static async Task<List<ParsedCommand>> ParseAsync(string input)
|
||||
{
|
||||
var pipe = new Pipe();
|
||||
var commands = new List<ParsedCommand>();
|
||||
|
||||
// Write input to pipe
|
||||
var bytes = Encoding.ASCII.GetBytes(input);
|
||||
await pipe.Writer.WriteAsync(bytes);
|
||||
pipe.Writer.Complete();
|
||||
|
||||
// Parse from pipe
|
||||
var parser = new NatsParser(maxPayload: NatsProtocol.MaxPayloadSize);
|
||||
while (true)
|
||||
{
|
||||
var result = await pipe.Reader.ReadAsync();
|
||||
var buffer = result.Buffer;
|
||||
|
||||
while (parser.TryParse(ref buffer, out var cmd))
|
||||
commands.Add(cmd);
|
||||
|
||||
pipe.Reader.AdvanceTo(buffer.Start, buffer.End);
|
||||
|
||||
if (result.IsCompleted)
|
||||
break;
|
||||
}
|
||||
|
||||
return commands;
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Parse_PING()
|
||||
{
|
||||
var cmds = await ParseAsync("PING\r\n");
|
||||
Assert.Single(cmds);
|
||||
Assert.Equal(CommandType.Ping, cmds[0].Type);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Parse_PONG()
|
||||
{
|
||||
var cmds = await ParseAsync("PONG\r\n");
|
||||
Assert.Single(cmds);
|
||||
Assert.Equal(CommandType.Pong, cmds[0].Type);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Parse_CONNECT()
|
||||
{
|
||||
var cmds = await ParseAsync("CONNECT {\"verbose\":false,\"echo\":true}\r\n");
|
||||
Assert.Single(cmds);
|
||||
Assert.Equal(CommandType.Connect, cmds[0].Type);
|
||||
Assert.Contains("verbose", Encoding.ASCII.GetString(cmds[0].Payload.ToArray()));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Parse_SUB_without_queue()
|
||||
{
|
||||
var cmds = await ParseAsync("SUB foo 1\r\n");
|
||||
Assert.Single(cmds);
|
||||
Assert.Equal(CommandType.Sub, cmds[0].Type);
|
||||
Assert.Equal("foo", cmds[0].Subject);
|
||||
Assert.Null(cmds[0].Queue);
|
||||
Assert.Equal("1", cmds[0].Sid);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Parse_SUB_with_queue()
|
||||
{
|
||||
var cmds = await ParseAsync("SUB foo workers 1\r\n");
|
||||
Assert.Single(cmds);
|
||||
Assert.Equal(CommandType.Sub, cmds[0].Type);
|
||||
Assert.Equal("foo", cmds[0].Subject);
|
||||
Assert.Equal("workers", cmds[0].Queue);
|
||||
Assert.Equal("1", cmds[0].Sid);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Parse_UNSUB()
|
||||
{
|
||||
var cmds = await ParseAsync("UNSUB 1\r\n");
|
||||
Assert.Single(cmds);
|
||||
Assert.Equal(CommandType.Unsub, cmds[0].Type);
|
||||
Assert.Equal("1", cmds[0].Sid);
|
||||
Assert.Equal(-1, cmds[0].MaxMessages);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Parse_UNSUB_with_max()
|
||||
{
|
||||
var cmds = await ParseAsync("UNSUB 1 5\r\n");
|
||||
Assert.Single(cmds);
|
||||
Assert.Equal(CommandType.Unsub, cmds[0].Type);
|
||||
Assert.Equal("1", cmds[0].Sid);
|
||||
Assert.Equal(5, cmds[0].MaxMessages);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Parse_PUB_with_payload()
|
||||
{
|
||||
var cmds = await ParseAsync("PUB foo 5\r\nHello\r\n");
|
||||
Assert.Single(cmds);
|
||||
Assert.Equal(CommandType.Pub, cmds[0].Type);
|
||||
Assert.Equal("foo", cmds[0].Subject);
|
||||
Assert.Null(cmds[0].ReplyTo);
|
||||
Assert.Equal("Hello", Encoding.ASCII.GetString(cmds[0].Payload.ToArray()));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Parse_PUB_with_reply()
|
||||
{
|
||||
var cmds = await ParseAsync("PUB foo reply 5\r\nHello\r\n");
|
||||
Assert.Single(cmds);
|
||||
Assert.Equal(CommandType.Pub, cmds[0].Type);
|
||||
Assert.Equal("foo", cmds[0].Subject);
|
||||
Assert.Equal("reply", cmds[0].ReplyTo);
|
||||
Assert.Equal("Hello", Encoding.ASCII.GetString(cmds[0].Payload.ToArray()));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Parse_multiple_commands()
|
||||
{
|
||||
var cmds = await ParseAsync("PING\r\nPONG\r\nSUB foo 1\r\n");
|
||||
Assert.Equal(3, cmds.Count);
|
||||
Assert.Equal(CommandType.Ping, cmds[0].Type);
|
||||
Assert.Equal(CommandType.Pong, cmds[1].Type);
|
||||
Assert.Equal(CommandType.Sub, cmds[2].Type);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Parse_PUB_zero_payload()
|
||||
{
|
||||
var cmds = await ParseAsync("PUB foo 0\r\n\r\n");
|
||||
Assert.Single(cmds);
|
||||
Assert.Equal(CommandType.Pub, cmds[0].Type);
|
||||
Assert.Empty(cmds[0].Payload.ToArray());
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Parse_case_insensitive()
|
||||
{
|
||||
var cmds = await ParseAsync("ping\r\npub FOO 3\r\nabc\r\n");
|
||||
Assert.Equal(2, cmds.Count);
|
||||
Assert.Equal(CommandType.Ping, cmds[0].Type);
|
||||
Assert.Equal(CommandType.Pub, cmds[1].Type);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Parse_HPUB()
|
||||
{
|
||||
// HPUB subject 12 17\r\nNATS/1.0\r\n\r\nHello\r\n
|
||||
var header = "NATS/1.0\r\n\r\n";
|
||||
var payload = "Hello";
|
||||
var total = header.Length + payload.Length;
|
||||
var cmds = await ParseAsync($"HPUB foo {header.Length} {total}\r\n{header}{payload}\r\n");
|
||||
Assert.Single(cmds);
|
||||
Assert.Equal(CommandType.HPub, cmds[0].Type);
|
||||
Assert.Equal("foo", cmds[0].Subject);
|
||||
Assert.Equal(header.Length, cmds[0].HeaderSize);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Parse_INFO()
|
||||
{
|
||||
var cmds = await ParseAsync("INFO {\"server_id\":\"test\"}\r\n");
|
||||
Assert.Single(cmds);
|
||||
Assert.Equal(CommandType.Info, cmds[0].Type);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user