diff --git a/src/NATS.Server/Protocol/NatsParser.cs b/src/NATS.Server/Protocol/NatsParser.cs index 55fbf40..85063fc 100644 --- a/src/NATS.Server/Protocol/NatsParser.cs +++ b/src/NATS.Server/Protocol/NatsParser.cs @@ -38,22 +38,20 @@ public readonly struct ParsedCommand public sealed class NatsParser { private static readonly byte[] CrLfBytes = "\r\n"u8.ToArray(); - private readonly int _maxPayload; private ILogger? _logger; public ILogger? Logger { set => _logger = value; } // State for split-packet payload reading private bool _awaitingPayload; private int _expectedPayloadSize; - private string? _pendingSubject; - private string? _pendingReplyTo; + private byte[]? _pendingSubject; + private byte[]? _pendingReplyTo; private int _pendingHeaderSize; private CommandType _pendingType; private string _pendingOperation = string.Empty; public NatsParser(int maxPayload = NatsProtocol.MaxPayloadSize, ILogger? logger = null) { - _maxPayload = maxPayload; _logger = logger; } @@ -71,6 +69,17 @@ public sealed class NatsParser { command = default; + if (!TryParseView(ref buffer, out var view)) + return false; + + command = view.Materialize(); + return true; + } + + internal bool TryParseView(ref ReadOnlySequence buffer, out ParsedCommandView command) + { + command = default; + if (_awaitingPayload) return TryReadPayload(ref buffer, out command); @@ -114,7 +123,7 @@ public sealed class NatsParser case (byte)'p': if (b1 == (byte)'i') // PING { - command = ParsedCommand.Simple(CommandType.Ping, "PING"); + command = ParsedCommandView.Simple(CommandType.Ping, "PING"); buffer = buffer.Slice(reader.Position); TraceInOp("PING"); return true; @@ -122,7 +131,7 @@ public sealed class NatsParser if (b1 == (byte)'o') // PONG { - command = ParsedCommand.Simple(CommandType.Pong, "PONG"); + command = ParsedCommandView.Simple(CommandType.Pong, "PONG"); buffer = buffer.Slice(reader.Position); TraceInOp("PONG"); return true; @@ -188,13 +197,13 @@ public sealed class NatsParser break; case (byte)'+': // +OK - command = ParsedCommand.Simple(CommandType.Ok, "+OK"); + command = ParsedCommandView.Simple(CommandType.Ok, "+OK"); buffer = buffer.Slice(reader.Position); TraceInOp("+OK"); return true; case (byte)'-': // -ERR - command = ParsedCommand.Simple(CommandType.Err, "-ERR"); + command = ParsedCommandView.Simple(CommandType.Err, "-ERR"); buffer = buffer.Slice(reader.Position); TraceInOp("-ERR"); return true; @@ -227,7 +236,7 @@ public sealed class NatsParser Span line, ref ReadOnlySequence buffer, SequencePosition afterLine, - out ParsedCommand command) + out ParsedCommandView command) { command = default; @@ -236,19 +245,19 @@ public sealed class NatsParser var argsSpan = line[4..]; int argCount = SplitArgs(argsSpan, ranges); - string subject; - string? reply = null; + byte[] subject; + byte[]? reply = null; int size; if (argCount == 2) { - subject = Encoding.ASCII.GetString(argsSpan[ranges[0]]); + subject = argsSpan[ranges[0]].ToArray(); size = ParseSize(argsSpan[ranges[1]]); } else if (argCount == 3) { - subject = Encoding.ASCII.GetString(argsSpan[ranges[0]]); - reply = Encoding.ASCII.GetString(argsSpan[ranges[1]]); + subject = argsSpan[ranges[0]].ToArray(); + reply = argsSpan[ranges[1]].ToArray(); size = ParseSize(argsSpan[ranges[2]]); } else @@ -277,7 +286,7 @@ public sealed class NatsParser Span line, ref ReadOnlySequence buffer, SequencePosition afterLine, - out ParsedCommand command) + out ParsedCommandView command) { command = default; @@ -286,20 +295,20 @@ public sealed class NatsParser var argsSpan = line[5..]; int argCount = SplitArgs(argsSpan, ranges); - string subject; - string? reply = null; + byte[] subject; + byte[]? reply = null; int hdrSize, totalSize; if (argCount == 3) { - subject = Encoding.ASCII.GetString(argsSpan[ranges[0]]); + subject = argsSpan[ranges[0]].ToArray(); hdrSize = ParseSize(argsSpan[ranges[1]]); totalSize = ParseSize(argsSpan[ranges[2]]); } else if (argCount == 4) { - subject = Encoding.ASCII.GetString(argsSpan[ranges[0]]); - reply = Encoding.ASCII.GetString(argsSpan[ranges[1]]); + subject = argsSpan[ranges[0]].ToArray(); + reply = argsSpan[ranges[1]].ToArray(); hdrSize = ParseSize(argsSpan[ranges[2]]); totalSize = ParseSize(argsSpan[ranges[3]]); } @@ -324,7 +333,7 @@ public sealed class NatsParser return TryReadPayload(ref buffer, out command); } - private bool TryReadPayload(ref ReadOnlySequence buffer, out ParsedCommand command) + private bool TryReadPayload(ref ReadOnlySequence buffer, out ParsedCommandView command) { command = default; @@ -333,10 +342,7 @@ public sealed class NatsParser if (buffer.Length < needed) return false; - // Extract payload var payloadSlice = buffer.Slice(0, _expectedPayloadSize); - var payload = new byte[_expectedPayloadSize]; - payloadSlice.CopyTo(payload); // Verify \r\n after payload var trailer = buffer.Slice(_expectedPayloadSize, 2); @@ -345,23 +351,25 @@ public sealed class NatsParser if (trailerBytes[0] != (byte)'\r' || trailerBytes[1] != (byte)'\n') throw new ProtocolViolationException("Expected \\r\\n after payload"); - command = new ParsedCommand + command = new ParsedCommandView { Type = _pendingType, Operation = _pendingOperation, Subject = _pendingSubject, ReplyTo = _pendingReplyTo, - Payload = payload, + Payload = payloadSlice, HeaderSize = _pendingHeaderSize, MaxMessages = -1, }; buffer = buffer.Slice(buffer.GetPosition(needed)); _awaitingPayload = false; + _pendingSubject = null; + _pendingReplyTo = null; return true; } - private static ParsedCommand ParseSub(Span line) + private static ParsedCommandView ParseSub(Span line) { // SUB subject [queue] sid -- skip "SUB " if (line.Length < 5) @@ -372,28 +380,28 @@ public sealed class NatsParser return argCount switch { - 2 => new ParsedCommand + 2 => new ParsedCommandView { Type = CommandType.Sub, Operation = "SUB", - Subject = Encoding.ASCII.GetString(argsSpan[ranges[0]]), - Sid = Encoding.ASCII.GetString(argsSpan[ranges[1]]), + Subject = CopyBytes(argsSpan[ranges[0]]), + Sid = CopyBytes(argsSpan[ranges[1]]), MaxMessages = -1, }, - 3 => new ParsedCommand + 3 => new ParsedCommandView { Type = CommandType.Sub, Operation = "SUB", - Subject = Encoding.ASCII.GetString(argsSpan[ranges[0]]), - Queue = Encoding.ASCII.GetString(argsSpan[ranges[1]]), - Sid = Encoding.ASCII.GetString(argsSpan[ranges[2]]), + Subject = CopyBytes(argsSpan[ranges[0]]), + Queue = CopyBytes(argsSpan[ranges[1]]), + Sid = CopyBytes(argsSpan[ranges[2]]), MaxMessages = -1, }, _ => throw new ProtocolViolationException("Invalid SUB arguments"), }; } - private static ParsedCommand ParseUnsub(Span line) + private static ParsedCommandView ParseUnsub(Span line) { // UNSUB sid [max_msgs] -- skip "UNSUB " if (line.Length < 7) @@ -404,25 +412,25 @@ public sealed class NatsParser return argCount switch { - 1 => new ParsedCommand + 1 => new ParsedCommandView { Type = CommandType.Unsub, Operation = "UNSUB", - Sid = Encoding.ASCII.GetString(argsSpan[ranges[0]]), + Sid = CopyBytes(argsSpan[ranges[0]]), MaxMessages = -1, }, - 2 => new ParsedCommand + 2 => new ParsedCommandView { Type = CommandType.Unsub, Operation = "UNSUB", - Sid = Encoding.ASCII.GetString(argsSpan[ranges[0]]), + Sid = CopyBytes(argsSpan[ranges[0]]), MaxMessages = ParseSize(argsSpan[ranges[1]]), }, _ => throw new ProtocolViolationException("Invalid UNSUB arguments"), }; } - private static ParsedCommand ParseConnect(Span line) + private static ParsedCommandView ParseConnect(Span line) { // CONNECT {json} -- find first space after command int spaceIdx = line.IndexOf((byte)' '); @@ -430,16 +438,16 @@ public sealed class NatsParser throw new ProtocolViolationException("Invalid CONNECT"); var json = line[(spaceIdx + 1)..]; - return new ParsedCommand + return new ParsedCommandView { Type = CommandType.Connect, Operation = "CONNECT", - Payload = json.ToArray(), + Payload = new ReadOnlySequence(json.ToArray()), MaxMessages = -1, }; } - private static ParsedCommand ParseInfo(Span line) + private static ParsedCommandView ParseInfo(Span line) { // INFO {json} -- find first space after command int spaceIdx = line.IndexOf((byte)' '); @@ -447,15 +455,18 @@ public sealed class NatsParser throw new ProtocolViolationException("Invalid INFO"); var json = line[(spaceIdx + 1)..]; - return new ParsedCommand + return new ParsedCommandView { Type = CommandType.Info, Operation = "INFO", - Payload = json.ToArray(), + Payload = new ReadOnlySequence(json.ToArray()), MaxMessages = -1, }; } + private static ReadOnlyMemory CopyBytes(ReadOnlySpan value) => + value.IsEmpty ? ReadOnlyMemory.Empty : value.ToArray(); + /// /// Parse a decimal integer from ASCII bytes. Returns -1 on error. /// diff --git a/src/NATS.Server/Protocol/ParsedCommandView.cs b/src/NATS.Server/Protocol/ParsedCommandView.cs new file mode 100644 index 0000000..799c6fc --- /dev/null +++ b/src/NATS.Server/Protocol/ParsedCommandView.cs @@ -0,0 +1,45 @@ +using System.Buffers; +using System.Text; + +namespace NATS.Server.Protocol; + +public readonly struct ParsedCommandView +{ + public CommandType Type { get; init; } + public string? Operation { get; init; } + public ReadOnlyMemory Subject { get; init; } + public ReadOnlyMemory ReplyTo { get; init; } + public ReadOnlyMemory Queue { get; init; } + public ReadOnlyMemory Sid { get; init; } + public int MaxMessages { get; init; } + public int HeaderSize { get; init; } + public ReadOnlySequence Payload { get; init; } + + public static ParsedCommandView Simple(CommandType type, string operation) => + new() { Type = type, Operation = operation, MaxMessages = -1 }; + + public ParsedCommand Materialize() => + new() + { + Type = Type, + Operation = Operation, + Subject = DecodeAsciiOrNull(Subject), + ReplyTo = DecodeAsciiOrNull(ReplyTo), + Queue = DecodeAsciiOrNull(Queue), + Sid = DecodeAsciiOrNull(Sid), + MaxMessages = MaxMessages, + HeaderSize = HeaderSize, + Payload = MaterializePayload(), + }; + + private ReadOnlyMemory MaterializePayload() + { + if (Payload.IsEmpty) + return ReadOnlyMemory.Empty; + + return Payload.IsSingleSegment ? Payload.First : Payload.ToArray(); + } + + private static string? DecodeAsciiOrNull(ReadOnlyMemory value) => + value.IsEmpty ? null : Encoding.ASCII.GetString(value.Span); +}