using System.Buffers; using System.Text; using System.Text.Json; using Microsoft.Extensions.Logging; namespace NATS.Server.Protocol; public enum CommandType { Ping, Pong, Connect, Info, Pub, HPub, Sub, Unsub, Ok, Err, } public readonly struct ParsedCommand { public CommandType Type { get; init; } public string? Operation { get; init; } public string? Subject { get; init; } public string? ReplyTo { get; init; } public string? Queue { get; init; } public string? Sid { get; init; } public int MaxMessages { get; init; } public int HeaderSize { get; init; } public ReadOnlyMemory Payload { get; init; } public static ParsedCommand Simple(CommandType type, string operation) => new() { Type = type, Operation = operation, MaxMessages = -1 }; } public sealed class NatsParser { private static ReadOnlySpan CrLfBytes => "\r\n"u8; private ILogger? _logger; public ILogger? Logger { set => _logger = value; } // State for split-packet payload reading private bool _awaitingPayload; private int _expectedPayloadSize; private byte[]? _pendingSubject; private byte[]? _pendingReplyTo; private int _pendingHeaderSize; private CommandType _pendingType; private string _pendingOperation = string.Empty; public NatsParser(int maxPayload = NatsProtocol.MaxPayloadSize, ILogger? logger = null) { _logger = logger; } private void TraceInOp(string op, ReadOnlySpan arg = default) { if (_logger == null || !_logger.IsEnabled(LogLevel.Trace)) return; if (arg.IsEmpty) _logger.LogTrace("<<- {Op}", op); else _logger.LogTrace("<<- {Op} {Arg}", op, Encoding.ASCII.GetString(arg)); } public bool TryParse(ref ReadOnlySequence buffer, out ParsedCommand command) { command = default; if (!TryParseView(ref buffer, out var view)) return false; command = view.Materialize(); return true; } internal bool TryParseView(ref ReadOnlySequence buffer, out ParsedCommandView command) { command = default; if (_awaitingPayload) return TryReadPayload(ref buffer, out command); // Look for \r\n to find control line var reader = new SequenceReader(buffer); if (!reader.TryReadTo(out ReadOnlySequence line, CrLfBytes)) return false; // Control line size check if (line.Length > NatsProtocol.MaxControlLineSize) { var snippetLength = (int)Math.Min(line.Length, NatsProtocol.MaxControlLineSnippetSize); var snippetBytes = new byte[snippetLength]; line.Slice(0, snippetLength).CopyTo(snippetBytes); var snippet = ProtoSnippet(0, NatsProtocol.MaxControlLineSnippetSize, snippetBytes); throw new ProtocolViolationException( $"Maximum control line exceeded (max={NatsProtocol.MaxControlLineSize}, len={line.Length}, snip={snippet}...)"); } // Get line as contiguous span Span lineSpan = stackalloc byte[(int)line.Length]; line.CopyTo(lineSpan); // Identify command by first bytes if (lineSpan.Length < 2) { if (lineSpan.Length == 1 && lineSpan[0] is (byte)'+') { // partial -- need more data return false; } throw new ProtocolViolationException($"Unknown protocol operation: {ProtoSnippet(lineSpan)}"); } byte b0 = (byte)(lineSpan[0] | 0x20); // lowercase byte b1 = (byte)(lineSpan[1] | 0x20); switch (b0) { case (byte)'p': if (b1 == (byte)'i') // PING { command = ParsedCommandView.Simple(CommandType.Ping, "PING"); buffer = buffer.Slice(reader.Position); TraceInOp("PING"); return true; } if (b1 == (byte)'o') // PONG { command = ParsedCommandView.Simple(CommandType.Pong, "PONG"); buffer = buffer.Slice(reader.Position); TraceInOp("PONG"); return true; } if (b1 == (byte)'u') // PUB { return ParsePub(lineSpan, ref buffer, reader.Position, out command); } break; case (byte)'h': if (b1 == (byte)'p') // HPUB { return ParseHPub(lineSpan, ref buffer, reader.Position, out command); } break; case (byte)'s': if (b1 == (byte)'u') // SUB { command = ParseSub(lineSpan); buffer = buffer.Slice(reader.Position); TraceInOp("SUB", lineSpan[4..]); return true; } break; case (byte)'u': if (b1 == (byte)'n') // UNSUB { command = ParseUnsub(lineSpan); buffer = buffer.Slice(reader.Position); TraceInOp("UNSUB", lineSpan[6..]); return true; } break; case (byte)'c': if (b1 == (byte)'o') // CONNECT { command = ParseConnect(line); buffer = buffer.Slice(reader.Position); TraceInOp("CONNECT"); return true; } break; case (byte)'i': if (b1 == (byte)'n') // INFO { command = ParseInfo(line); buffer = buffer.Slice(reader.Position); TraceInOp("INFO"); return true; } break; case (byte)'+': // +OK command = ParsedCommandView.Simple(CommandType.Ok, "+OK"); buffer = buffer.Slice(reader.Position); TraceInOp("+OK"); return true; case (byte)'-': // -ERR command = ParsedCommandView.Simple(CommandType.Err, "-ERR"); buffer = buffer.Slice(reader.Position); TraceInOp("-ERR"); return true; } throw new ProtocolViolationException($"Unknown protocol operation: {ProtoSnippet(lineSpan)}"); } // Go reference: parser.go protoSnippet(start, max, buf). internal static string ProtoSnippet(int start, int max, ReadOnlySpan buffer) { if (start >= buffer.Length) return "\"\""; var stop = start + max; if (stop > buffer.Length) stop = buffer.Length - 1; if (stop <= start) return "\"\""; var slice = buffer[start..stop]; return JsonSerializer.Serialize(Encoding.ASCII.GetString(slice)); } internal static string ProtoSnippet(ReadOnlySpan buffer) => ProtoSnippet(0, NatsProtocol.ProtoSnippetSize, buffer); private bool ParsePub( Span line, ref ReadOnlySequence buffer, SequencePosition afterLine, out ParsedCommandView command) { command = default; // PUB subject [reply] size -- skip "PUB " Span ranges = stackalloc Range[4]; var argsSpan = line[4..]; int argCount = SplitArgs(argsSpan, ranges); byte[] subject; byte[]? reply = null; int size; if (argCount == 2) { subject = argsSpan[ranges[0]].ToArray(); size = ParseSize(argsSpan[ranges[1]]); } else if (argCount == 3) { subject = argsSpan[ranges[0]].ToArray(); reply = argsSpan[ranges[1]].ToArray(); size = ParseSize(argsSpan[ranges[2]]); } else { throw new ProtocolViolationException("Invalid PUB arguments"); } if (size < 0) throw new ProtocolViolationException("Invalid payload size"); // Now read payload + \r\n (max payload enforcement is done at the client level) buffer = buffer.Slice(afterLine); _awaitingPayload = true; _expectedPayloadSize = size; _pendingSubject = subject; _pendingReplyTo = reply; _pendingHeaderSize = -1; _pendingType = CommandType.Pub; _pendingOperation = "PUB"; TraceInOp("PUB", argsSpan); return TryReadPayload(ref buffer, out command); } private bool ParseHPub( Span line, ref ReadOnlySequence buffer, SequencePosition afterLine, out ParsedCommandView command) { command = default; // HPUB subject [reply] hdr_size total_size -- skip "HPUB " Span ranges = stackalloc Range[5]; var argsSpan = line[5..]; int argCount = SplitArgs(argsSpan, ranges); byte[] subject; byte[]? reply = null; int hdrSize, totalSize; if (argCount == 3) { subject = argsSpan[ranges[0]].ToArray(); hdrSize = ParseSize(argsSpan[ranges[1]]); totalSize = ParseSize(argsSpan[ranges[2]]); } else if (argCount == 4) { subject = argsSpan[ranges[0]].ToArray(); reply = argsSpan[ranges[1]].ToArray(); hdrSize = ParseSize(argsSpan[ranges[2]]); totalSize = ParseSize(argsSpan[ranges[3]]); } else { throw new ProtocolViolationException("Invalid HPUB arguments"); } if (hdrSize < 0 || totalSize < 0 || hdrSize > totalSize) throw new ProtocolViolationException("Invalid HPUB sizes"); buffer = buffer.Slice(afterLine); _awaitingPayload = true; _expectedPayloadSize = totalSize; _pendingSubject = subject; _pendingReplyTo = reply; _pendingHeaderSize = hdrSize; _pendingType = CommandType.HPub; _pendingOperation = "HPUB"; TraceInOp("HPUB", argsSpan); return TryReadPayload(ref buffer, out command); } private bool TryReadPayload(ref ReadOnlySequence buffer, out ParsedCommandView command) { command = default; // Need: _expectedPayloadSize bytes + \r\n long needed = _expectedPayloadSize + 2; // payload + \r\n if (buffer.Length < needed) return false; var payloadSlice = buffer.Slice(0, _expectedPayloadSize); // Verify \r\n after payload var trailer = buffer.Slice(_expectedPayloadSize, 2); Span trailerBytes = stackalloc byte[2]; trailer.CopyTo(trailerBytes); if (trailerBytes[0] != (byte)'\r' || trailerBytes[1] != (byte)'\n') throw new ProtocolViolationException("Expected \\r\\n after payload"); command = new ParsedCommandView { Type = _pendingType, Operation = _pendingOperation, Subject = _pendingSubject, ReplyTo = _pendingReplyTo, Payload = payloadSlice, HeaderSize = _pendingHeaderSize, MaxMessages = -1, }; buffer = buffer.Slice(buffer.GetPosition(needed)); _awaitingPayload = false; _pendingSubject = null; _pendingReplyTo = null; return true; } private static ParsedCommandView ParseSub(Span line) { // SUB subject [queue] sid -- skip "SUB " if (line.Length < 5) throw new ProtocolViolationException("Invalid SUB arguments"); Span ranges = stackalloc Range[4]; var argsSpan = line[4..]; int argCount = SplitArgs(argsSpan, ranges); return argCount switch { 2 => new ParsedCommandView { Type = CommandType.Sub, Operation = "SUB", Subject = CopyBytes(argsSpan[ranges[0]]), Sid = CopyBytes(argsSpan[ranges[1]]), MaxMessages = -1, }, 3 => new ParsedCommandView { Type = CommandType.Sub, Operation = "SUB", Subject = CopyBytes(argsSpan[ranges[0]]), Queue = CopyBytes(argsSpan[ranges[1]]), Sid = CopyBytes(argsSpan[ranges[2]]), MaxMessages = -1, }, _ => throw new ProtocolViolationException("Invalid SUB arguments"), }; } private static ParsedCommandView ParseUnsub(Span line) { // UNSUB sid [max_msgs] -- skip "UNSUB " if (line.Length < 7) throw new ProtocolViolationException("Invalid UNSUB arguments"); Span ranges = stackalloc Range[3]; var argsSpan = line[6..]; int argCount = SplitArgs(argsSpan, ranges); return argCount switch { 1 => new ParsedCommandView { Type = CommandType.Unsub, Operation = "UNSUB", Sid = CopyBytes(argsSpan[ranges[0]]), MaxMessages = -1, }, 2 => new ParsedCommandView { Type = CommandType.Unsub, Operation = "UNSUB", Sid = CopyBytes(argsSpan[ranges[0]]), MaxMessages = ParseSize(argsSpan[ranges[1]]), }, _ => throw new ProtocolViolationException("Invalid UNSUB arguments"), }; } private static ParsedCommandView ParseConnect(ReadOnlySequence line) { var reader = new SequenceReader(line); if (!reader.TryAdvanceTo((byte)' ', advancePastDelimiter: true)) throw new ProtocolViolationException("Invalid CONNECT"); var json = line.Slice(reader.Position); return new ParsedCommandView { Type = CommandType.Connect, Operation = "CONNECT", Payload = json, MaxMessages = -1, }; } private static ParsedCommandView ParseInfo(ReadOnlySequence line) { var reader = new SequenceReader(line); if (!reader.TryAdvanceTo((byte)' ', advancePastDelimiter: true)) throw new ProtocolViolationException("Invalid INFO"); var json = line.Slice(reader.Position); return new ParsedCommandView { Type = CommandType.Info, Operation = "INFO", Payload = json, MaxMessages = -1, }; } private static ReadOnlyMemory CopyBytes(ReadOnlySpan value) => value.IsEmpty ? ReadOnlyMemory.Empty : value.ToArray(); /// /// Parse a decimal integer from ASCII bytes. Returns -1 on error. /// internal static int ParseSize(Span data) { if (data.Length == 0 || data.Length > 9) return -1; int n = 0; foreach (byte b in data) { if (b < (byte)'0' || b > (byte)'9') return -1; n = n * 10 + (b - '0'); } return n; } /// /// Split by spaces/tabs into argument ranges. Returns the number of arguments found. /// Uses Span<Range> for zero-allocation argument splitting. /// internal static int SplitArgs(Span data, Span ranges) { int count = 0; int start = -1; for (int i = 0; i < data.Length; i++) { byte b = data[i]; if (b is (byte)' ' or (byte)'\t') { if (start >= 0) { if (count >= ranges.Length) throw new ProtocolViolationException("Too many arguments"); ranges[count++] = start..i; start = -1; } } else { if (start < 0) start = i; } } if (start >= 0) { if (count >= ranges.Length) throw new ProtocolViolationException("Too many arguments"); ranges[count++] = start..data.Length; } return count; } } public class ProtocolViolationException : Exception { public ProtocolViolationException(string message) : base(message) { } }