Files
2026-03-13 10:08:20 -04:00

313 lines
12 KiB
Markdown

# Parser
`NatsParser` is a stateful byte-level parser that processes NATS protocol commands from a `ReadOnlySequence<byte>` provided by `System.IO.Pipelines`. It is called repeatedly in a read loop until no more complete commands are available in the buffer.
## Key Types
### CommandType
The `CommandType` enum identifies every command the parser can produce:
```csharp
public enum CommandType
{
Ping,
Pong,
Connect,
Info,
Pub,
HPub,
Sub,
Unsub,
Ok,
Err,
}
```
### ParsedCommandView
`ParsedCommandView` is the byte-first parser result used on the hot path. It keeps protocol fields in byte-oriented storage and exposes payload as a `ReadOnlySequence<byte>` so single-segment bodies can flow through without an unconditional copy.
```csharp
public readonly struct ParsedCommandView
{
public CommandType Type { get; init; }
public string? Operation { get; init; }
public ReadOnlyMemory<byte> Subject { get; init; }
public ReadOnlyMemory<byte> ReplyTo { get; init; }
public ReadOnlyMemory<byte> Queue { get; init; }
public ReadOnlyMemory<byte> Sid { get; init; }
public int MaxMessages { get; init; }
public int HeaderSize { get; init; }
public ReadOnlySequence<byte> Payload { get; init; }
}
```
`Subject`, `ReplyTo`, `Queue`, and `Sid` remain ASCII-encoded bytes until a caller explicitly materializes them. `Payload` stays sequence-backed until a caller asks for contiguous memory.
### ParsedCommand
`ParsedCommand` remains the compatibility shape for existing consumers. `TryParse` now delegates through `TryParseView` and materializes strings and contiguous payload memory in one adapter step instead of during every parse branch.
```csharp
public readonly struct ParsedCommand
{
public CommandType Type { get; init; }
public string? Subject { get; init; }
public string? ReplyTo { get; init; }
public string? Queue { get; init; }
public string? Sid { get; init; }
public int MaxMessages { get; init; }
public int HeaderSize { get; init; }
public ReadOnlyMemory<byte> Payload { get; init; }
public static ParsedCommand Simple(CommandType type) => new() { Type = type, MaxMessages = -1 };
}
```
Fields that do not apply to a given command type are left at their default values (`null` for strings, `0` for integers). `MaxMessages` uses `-1` as a sentinel meaning "unset" (relevant for UNSUB with no max). `HeaderSize` is set for HPUB/HMSG; `-1` indicates no headers. `Payload` carries the raw body bytes for PUB/HPUB, and the raw JSON bytes for CONNECT/INFO.
## TryParseView and TryParse
`TryParseView` is the byte-oriented parser entry point. It is called by hot-path consumers such as `NatsClient.ProcessCommandsAsync` when they want to defer materialization.
```csharp
internal bool TryParseView(ref ReadOnlySequence<byte> buffer, out ParsedCommandView command)
```
`TryParse` remains the compatibility entry point for existing call sites and tests:
```csharp
public bool TryParse(ref ReadOnlySequence<byte> buffer, out ParsedCommand command)
```
Both methods return `true` and advance `buffer` past the consumed bytes when a complete command is available. They return `false` — leaving `buffer` unchanged — when more data is needed. The caller must call the parser in a loop until it returns `false`, then call `PipeReader.AdvanceTo` to signal how far the buffer was consumed.
If the parser detects a malformed command it throws `ProtocolViolationException`, which the read loop catches to close the connection.
## Command Identification
After locating the `\r\n` control line terminator, the parser lowercase-normalises the first two bytes using a bitwise OR with `0x20` and dispatches on them. This single branch handles both upper- and lowercase input without a string comparison or allocation.
```csharp
byte b0 = (byte)(lineSpan[0] | 0x20); // lowercase
byte b1 = (byte)(lineSpan[1] | 0x20);
switch (b0)
{
case (byte)'p':
if (b1 == (byte)'i') // PING
{
command = ParsedCommand.Simple(CommandType.Ping);
buffer = buffer.Slice(reader.Position);
return true;
}
if (b1 == (byte)'o') // PONG
{
command = ParsedCommand.Simple(CommandType.Pong);
buffer = buffer.Slice(reader.Position);
return true;
}
if (b1 == (byte)'u') // PUB
{
return ParsePub(lineSpan, ref buffer, reader.Position, out command);
}
break;
case (byte)'h':
if (b1 == (byte)'p') // HPUB
{
return ParseHPub(lineSpan, ref buffer, reader.Position, out command);
}
break;
case (byte)'s':
if (b1 == (byte)'u') // SUB
{
command = ParseSub(lineSpan);
buffer = buffer.Slice(reader.Position);
return true;
}
break;
case (byte)'u':
if (b1 == (byte)'n') // UNSUB
{
command = ParseUnsub(lineSpan);
buffer = buffer.Slice(reader.Position);
return true;
}
break;
case (byte)'c':
if (b1 == (byte)'o') // CONNECT
{
command = ParseConnect(lineSpan);
buffer = buffer.Slice(reader.Position);
return true;
}
break;
case (byte)'i':
if (b1 == (byte)'n') // INFO
{
command = ParseInfo(lineSpan);
buffer = buffer.Slice(reader.Position);
return true;
}
break;
case (byte)'+': // +OK
command = ParsedCommand.Simple(CommandType.Ok);
buffer = buffer.Slice(reader.Position);
return true;
case (byte)'-': // -ERR
command = ParsedCommand.Simple(CommandType.Err);
buffer = buffer.Slice(reader.Position);
return true;
}
throw new ProtocolViolationException("Unknown protocol operation");
```
The two-character pairs are: `p+i` = PING, `p+o` = PONG, `p+u` = PUB, `h+p` = HPUB, `s+u` = SUB, `u+n` = UNSUB, `c+o` = CONNECT, `i+n` = INFO. `+` and `-` are matched on `b0` alone since their second characters are unambiguous.
## Two-Phase Parsing for PUB and HPUB
PUB and HPUB require a payload body that follows the control line. The parser handles split reads — where the TCP segment boundary falls inside the payload — through an `_awaitingPayload` state flag.
**Phase 1 — control line:** The parser reads the control line up to `\r\n`, extracts the subject, optional reply-to, and payload size(s), then stores these in private fields (`_pendingSubject`, `_pendingReplyTo`, `_expectedPayloadSize`, `_pendingHeaderSize`, `_pendingType`) and sets `_awaitingPayload = true`. The pending subject and reply values are held as byte-oriented state, not strings. It then immediately calls `TryReadPayload` to attempt phase 2.
**Phase 2 — payload read:** `TryReadPayload` checks whether `buffer.Length >= _expectedPayloadSize + 2` (the `+ 2` accounts for the trailing `\r\n`). If enough data is present, the parser slices the payload as a `ReadOnlySequence<byte>`, verifies the trailing `\r\n`, constructs a `ParsedCommandView`, and resets `_awaitingPayload` to `false`. If not enough data is present, `TryReadPayload` returns `false` and `_awaitingPayload` remains `true`.
On the next call to `TryParse`, the check at the top of the method routes straight to `TryReadPayload` without re-parsing the control line:
```csharp
if (_awaitingPayload)
return TryReadPayload(ref buffer, out command);
```
This means the parser correctly handles payloads that arrive across multiple `PipeReader.ReadAsync` completions without buffering the control line a second time.
## Materialization Boundaries
The parser now has explicit materialization boundaries:
- `TryParseView` keeps payloads sequence-backed and leaves token fields as bytes.
- `ParsedCommandView.Materialize()` converts byte fields to strings and converts multi-segment payloads to a standalone `byte[]`.
- `NatsClient` consumes `ParsedCommandView` directly for the `PUB` and `HPUB` hot path, only decoding subject and reply strings at the routing and permission-check boundary.
- `CONNECT` and `INFO` now keep their JSON payload as a slice of the original control-line sequence until a consumer explicitly materializes it.
Payload copying is still intentional in two places:
- when a multi-segment payload must become contiguous for a consumer using `ReadOnlyMemory<byte>`
- when compatibility callers continue to use `TryParse` and require a materialized `ParsedCommand`
## Zero-Allocation Argument Splitting
`SplitArgs` splits the argument portion of a control line into token ranges without allocating. The caller `stackalloc`s a `Span<Range>` sized to the maximum expected argument count for the command, then passes it to `SplitArgs`:
```csharp
internal static int SplitArgs(Span<byte> data, Span<Range> ranges)
{
int count = 0;
int start = -1;
for (int i = 0; i < data.Length; i++)
{
byte b = data[i];
if (b is (byte)' ' or (byte)'\t')
{
if (start >= 0)
{
if (count >= ranges.Length)
throw new ProtocolViolationException("Too many arguments");
ranges[count++] = start..i;
start = -1;
}
}
else
{
if (start < 0)
start = i;
}
}
if (start >= 0)
{
if (count >= ranges.Length)
throw new ProtocolViolationException("Too many arguments");
ranges[count++] = start..data.Length;
}
return count;
}
```
The returned `int` is the number of populated entries in `ranges`. Callers index into the original span using those ranges (e.g. `argsSpan[ranges[0]]`) to extract each token as a sub-span, then decode to `string` with `Encoding.ASCII.GetString`. Consecutive whitespace is collapsed: a new token only begins on a non-whitespace byte after one or more whitespace bytes.
## Decimal Integer Parsing
`ParseSize` converts an ASCII decimal integer in a byte span to an `int`. It is used for payload sizes and UNSUB max-message counts.
```csharp
internal static int ParseSize(Span<byte> data)
{
if (data.Length == 0 || data.Length > 9)
return -1;
int n = 0;
foreach (byte b in data)
{
if (b < (byte)'0' || b > (byte)'9')
return -1;
n = n * 10 + (b - '0');
}
return n;
}
```
The length cap of 9 digits prevents overflow without a checked-arithmetic check. A return value of `-1` signals a parse failure; callers treat this as a `ProtocolViolationException`.
## Error Handling
`ProtocolViolationException` is thrown for all malformed input:
- Control line exceeds `MaxControlLineSize` (4096 bytes).
- Unknown command bytes.
- Wrong number of arguments for a command.
- Payload size is negative, exceeds `MaxPayloadSize`, or the trailing `\r\n` after the payload is absent.
- `SplitArgs` receives more tokens than the caller's `ranges` span can hold.
The read loop is responsible for catching `ProtocolViolationException`, sending `-ERR` to the client, and closing the connection.
## Limits
| Limit | Value | Source |
|-------|-------|--------|
| Max control line | 4096 bytes | `NatsProtocol.MaxControlLineSize` |
| Max payload (default) | 1 048 576 bytes | `NatsProtocol.MaxPayloadSize` |
| Max size field digits | 9 | `ParseSize` length check |
The max payload is configurable: `NatsParser` accepts a `maxPayload` constructor argument, which `NatsClient` sets from `NatsOptions`.
## Go Reference
The .NET parser is a direct port of the state machine in `golang/nats-server/server/parser.go`. The Go implementation uses the same two-byte command identification technique and the same two-phase control-line/payload split for PUB and HPUB.
## Related Documentation
- [Protocol Overview](Overview.md)
- [Server Overview](../Server/Overview.md)
<!-- Last verified against codebase: 2026-02-22 -->