diff --git a/Documentation/Protocol/Parser.md b/Documentation/Protocol/Parser.md index d782b19..e76d8f2 100644 --- a/Documentation/Protocol/Parser.md +++ b/Documentation/Protocol/Parser.md @@ -24,9 +24,30 @@ public enum CommandType } ``` +### ParsedCommandView + +`ParsedCommandView` is the byte-first parser result used on the hot path. It keeps protocol fields in byte-oriented storage and exposes payload as a `ReadOnlySequence` so single-segment bodies can flow through without an unconditional copy. + +```csharp +public readonly struct ParsedCommandView +{ + public CommandType Type { get; init; } + public string? Operation { get; init; } + public ReadOnlyMemory Subject { get; init; } + public ReadOnlyMemory ReplyTo { get; init; } + public ReadOnlyMemory Queue { get; init; } + public ReadOnlyMemory Sid { get; init; } + public int MaxMessages { get; init; } + public int HeaderSize { get; init; } + public ReadOnlySequence Payload { get; init; } +} +``` + +`Subject`, `ReplyTo`, `Queue`, and `Sid` remain ASCII-encoded bytes until a caller explicitly materializes them. `Payload` stays sequence-backed until a caller asks for contiguous memory. + ### ParsedCommand -`ParsedCommand` is a `readonly struct` that carries the result of a successful parse. Using a struct avoids a heap allocation per command on the fast path. +`ParsedCommand` remains the compatibility shape for existing consumers. `TryParse` now delegates through `TryParseView` and materializes strings and contiguous payload memory in one adapter step instead of during every parse branch. ```csharp public readonly struct ParsedCommand @@ -46,15 +67,21 @@ public readonly struct ParsedCommand Fields that do not apply to a given command type are left at their default values (`null` for strings, `0` for integers). `MaxMessages` uses `-1` as a sentinel meaning "unset" (relevant for UNSUB with no max). `HeaderSize` is set for HPUB/HMSG; `-1` indicates no headers. `Payload` carries the raw body bytes for PUB/HPUB, and the raw JSON bytes for CONNECT/INFO. -## TryParse +## TryParseView and TryParse -`TryParse` is the main entry point. It is called by the read loop after each `PipeReader.ReadAsync` completes. +`TryParseView` is the byte-oriented parser entry point. It is called by hot-path consumers such as `NatsClient.ProcessCommandsAsync` when they want to defer materialization. + +```csharp +internal bool TryParseView(ref ReadOnlySequence buffer, out ParsedCommandView command) +``` + +`TryParse` remains the compatibility entry point for existing call sites and tests: ```csharp public bool TryParse(ref ReadOnlySequence buffer, out ParsedCommand command) ``` -The method returns `true` and advances `buffer` past the consumed bytes when a complete command is available. It returns `false` — leaving `buffer` unchanged — when more data is needed. The caller must call `TryParse` in a loop until it returns `false`, then call `PipeReader.AdvanceTo` to signal how far the buffer was consumed. +Both methods return `true` and advance `buffer` past the consumed bytes when a complete command is available. They return `false` — leaving `buffer` unchanged — when more data is needed. The caller must call the parser in a loop until it returns `false`, then call `PipeReader.AdvanceTo` to signal how far the buffer was consumed. If the parser detects a malformed command it throws `ProtocolViolationException`, which the read loop catches to close the connection. @@ -158,9 +185,9 @@ The two-character pairs are: `p+i` = PING, `p+o` = PONG, `p+u` = PUB, `h+p` = HP PUB and HPUB require a payload body that follows the control line. The parser handles split reads — where the TCP segment boundary falls inside the payload — through an `_awaitingPayload` state flag. -**Phase 1 — control line:** The parser reads the control line up to `\r\n`, extracts the subject, optional reply-to, and payload size(s), then stores these in private fields (`_pendingSubject`, `_pendingReplyTo`, `_expectedPayloadSize`, `_pendingHeaderSize`, `_pendingType`) and sets `_awaitingPayload = true`. It then immediately calls `TryReadPayload` to attempt phase 2. +**Phase 1 — control line:** The parser reads the control line up to `\r\n`, extracts the subject, optional reply-to, and payload size(s), then stores these in private fields (`_pendingSubject`, `_pendingReplyTo`, `_expectedPayloadSize`, `_pendingHeaderSize`, `_pendingType`) and sets `_awaitingPayload = true`. The pending subject and reply values are held as byte-oriented state, not strings. It then immediately calls `TryReadPayload` to attempt phase 2. -**Phase 2 — payload read:** `TryReadPayload` checks whether `buffer.Length >= _expectedPayloadSize + 2` (the `+ 2` accounts for the trailing `\r\n`). If enough data is present, the payload bytes are copied to a new `byte[]`, the trailing `\r\n` is verified, the `ParsedCommand` is constructed, and `_awaitingPayload` is reset to `false`. If not enough data is present, `TryReadPayload` returns `false` and `_awaitingPayload` remains `true`. +**Phase 2 — payload read:** `TryReadPayload` checks whether `buffer.Length >= _expectedPayloadSize + 2` (the `+ 2` accounts for the trailing `\r\n`). If enough data is present, the parser slices the payload as a `ReadOnlySequence`, verifies the trailing `\r\n`, constructs a `ParsedCommandView`, and resets `_awaitingPayload` to `false`. If not enough data is present, `TryReadPayload` returns `false` and `_awaitingPayload` remains `true`. On the next call to `TryParse`, the check at the top of the method routes straight to `TryReadPayload` without re-parsing the control line: @@ -171,6 +198,20 @@ if (_awaitingPayload) This means the parser correctly handles payloads that arrive across multiple `PipeReader.ReadAsync` completions without buffering the control line a second time. +## Materialization Boundaries + +The parser now has explicit materialization boundaries: + +- `TryParseView` keeps payloads sequence-backed and leaves token fields as bytes. +- `ParsedCommandView.Materialize()` converts byte fields to strings and converts multi-segment payloads to a standalone `byte[]`. +- `NatsClient` consumes `ParsedCommandView` directly for the `PUB` and `HPUB` hot path, only decoding subject and reply strings at the routing and permission-check boundary. +- `CONNECT` and `INFO` now keep their JSON payload as a slice of the original control-line sequence until a consumer explicitly materializes it. + +Payload copying is still intentional in two places: + +- when a multi-segment payload must become contiguous for a consumer using `ReadOnlyMemory` +- when compatibility callers continue to use `TryParse` and require a materialized `ParsedCommand` + ## Zero-Allocation Argument Splitting `SplitArgs` splits the argument portion of a control line into token ranges without allocating. The caller `stackalloc`s a `Span` sized to the maximum expected argument count for the command, then passes it to `SplitArgs`: diff --git a/benchmarks_comparison.md b/benchmarks_comparison.md index eefb9df..1c98912 100644 --- a/benchmarks_comparison.md +++ b/benchmarks_comparison.md @@ -1,8 +1,46 @@ # Go vs .NET NATS Server — Benchmark Comparison -Benchmark run: 2026-03-13. Both servers running on the same machine, tested with identical NATS.Client.Core workloads. Test parallelization disabled to avoid resource contention. Best-of-3 runs reported. +Benchmark run: 2026-03-13 10:06 AM America/Indiana/Indianapolis. The latest refresh used the benchmark project README command (`dotnet test tests/NATS.Server.Benchmark.Tests --filter "Category=Benchmark" -v normal --logger "console;verbosity=detailed"`) and completed successfully as a `.NET`-only run. The Go/.NET comparison tables below remain the last Go-capable comparison baseline. -**Environment:** Apple M4, .NET 10, Go nats-server (latest from `golang/nats-server/`). +**Environment:** Apple M4, .NET SDK 10.0.101, README benchmark command run in the benchmark project's default `Debug` configuration, Go toolchain installed but the current full-suite run emitted only `.NET` result blocks. + +--- + +## Latest README Run (.NET only) + +The current refresh came from `/tmp/bench-output.txt` using the benchmark project README workflow. Because the run did not emit any Go comparison blocks, the values below are the latest `.NET`-only numbers from that run, and the historical Go/.NET comparison tables are preserved below instead of being overwritten with mixed-source ratios. + +### Core and JetStream + +| Benchmark | .NET msg/s | .NET MB/s | Notes | +|-----------|------------|-----------|-------| +| Single Publisher (16B) | 1,392,442 | 21.2 | README full-suite run | +| Single Publisher (128B) | 1,491,226 | 182.0 | README full-suite run | +| PubSub 1:1 (16B) | 717,731 | 11.0 | README full-suite run | +| PubSub 1:1 (16KB) | 28,450 | 444.5 | README full-suite run | +| Fan-Out 1:4 (128B) | 1,451,748 | 177.2 | README full-suite run | +| Multi 4Px4S (128B) | 244,878 | 29.9 | README full-suite run | +| Request-Reply Single (128B) | 6,840 | 0.8 | P50 142.5 us, P99 203.9 us | +| Request-Reply 10Cx2S (16B) | 22,844 | 0.3 | P50 421.1 us, P99 602.1 us | +| JS Sync Publish (16B Memory) | 12,619 | 0.2 | README full-suite run | +| JS Async Publish (128B File) | 46,631 | 5.7 | README full-suite run | +| JS Ordered Consumer (128B) | 108,057 | 13.2 | README full-suite run | +| JS Durable Fetch (128B) | 490,090 | 59.8 | README full-suite run | + +### Parser Microbenchmarks + +| Benchmark | Ops/s | MB/s | Alloc | +|-----------|-------|------|-------| +| Parser PING | 5,756,370 | 32.9 | 0.0 B/op | +| Parser PUB | 2,537,973 | 96.8 | 40.0 B/op | +| Parser HPUB | 2,298,811 | 122.8 | 40.0 B/op | +| Parser PUB split payload | 2,049,535 | 78.2 | 176.0 B/op | + +### Current Run Highlights + +1. The parser microbenchmarks show the hot path is already at zero allocation for `PING`, with contiguous `PUB` and `HPUB` still paying a small fixed cost for retained field copies. +2. Split-payload `PUB` remains meaningfully more allocation-heavy than contiguous `PUB` because the parser must preserve unread payload state across reads and then materialize contiguous memory at the current client boundary. +3. The README-driven suite was a `.NET`-only refresh, so the comparative Go/.NET ratios below should still be treated as the last Go-capable baseline rather than current same-run ratios. --- diff --git a/src/NATS.Server/NATS.Server.csproj b/src/NATS.Server/NATS.Server.csproj index 395f186..9ef09e3 100644 --- a/src/NATS.Server/NATS.Server.csproj +++ b/src/NATS.Server/NATS.Server.csproj @@ -10,6 +10,7 @@ + diff --git a/src/NATS.Server/NatsClient.cs b/src/NATS.Server/NatsClient.cs index cea518f..6ca50b8 100644 --- a/src/NATS.Server/NatsClient.cs +++ b/src/NATS.Server/NatsClient.cs @@ -407,23 +407,23 @@ public sealed class NatsClient : INatsClient, IDisposable long localInMsgs = 0; long localInBytes = 0; - while (_parser.TryParse(ref buffer, out var cmd)) + while (_parser.TryParseView(ref buffer, out var cmdView)) { Interlocked.Exchange(ref _lastIn, Environment.TickCount64); // Handle Pub/HPub inline to allow ref parameter passing for stat batching. // DispatchCommandAsync is async and cannot accept ref parameters. - if (cmd.Type is CommandType.Pub or CommandType.HPub + if (cmdView.Type is CommandType.Pub or CommandType.HPub && (!_authService.IsAuthRequired || ConnectReceived)) { Interlocked.Exchange(ref _lastActivityTicks, DateTime.UtcNow.Ticks); - ProcessPub(cmd, ref localInMsgs, ref localInBytes); + ProcessPub(cmdView, ref localInMsgs, ref localInBytes); if (ClientOpts?.Verbose == true) WriteProtocol(NatsProtocol.OkBytes); } else { - await DispatchCommandAsync(cmd, ct); + await DispatchCommandAsync(cmdView.Materialize(), ct); } } @@ -702,46 +702,50 @@ public sealed class NatsClient : INatsClient, IDisposable server.OnLocalUnsubscription(Account?.Name ?? Account.GlobalAccountName, sub.Subject, sub.Queue); } - private void ProcessPub(ParsedCommand cmd, ref long localInMsgs, ref long localInBytes) + private void ProcessPub(ParsedCommandView cmd, ref long localInMsgs, ref long localInBytes) { + var payloadMemory = cmd.GetPayloadMemory(); localInMsgs++; - localInBytes += cmd.Payload.Length; + localInBytes += payloadMemory.Length; // Max payload validation (always, hard close) - if (cmd.Payload.Length > _options.MaxPayload) + if (payloadMemory.Length > _options.MaxPayload) { _logger.LogWarning("Client {ClientId} exceeded max payload: {Size} > {MaxPayload}", - Id, cmd.Payload.Length, _options.MaxPayload); + Id, payloadMemory.Length, _options.MaxPayload); _ = SendErrAndCloseAsync(NatsProtocol.ErrMaxPayloadViolation, ClientClosedReason.MaxPayloadExceeded); return; } + var subject = Encoding.ASCII.GetString(cmd.Subject.Span); + // Pedantic mode: validate publish subject - if (ClientOpts?.Pedantic == true && !SubjectMatch.IsValidPublishSubject(cmd.Subject!)) + if (ClientOpts?.Pedantic == true && !SubjectMatch.IsValidPublishSubject(subject)) { - _logger.LogDebug("Client {ClientId} invalid publish subject: {Subject}", Id, cmd.Subject); + _logger.LogDebug("Client {ClientId} invalid publish subject: {Subject}", Id, subject); SendErr(NatsProtocol.ErrInvalidPublishSubject); return; } // Permission check for publish - if (_permissions != null && !_permissions.IsPublishAllowed(cmd.Subject!)) + if (_permissions != null && !_permissions.IsPublishAllowed(subject)) { - _logger.LogDebug("Client {ClientId} publish permission denied for {Subject}", Id, cmd.Subject); + _logger.LogDebug("Client {ClientId} publish permission denied for {Subject}", Id, subject); SendErr(NatsProtocol.ErrPermissionsPublish); return; } ReadOnlyMemory headers = default; - ReadOnlyMemory payload = cmd.Payload; + ReadOnlyMemory payload = payloadMemory; if (cmd.Type == CommandType.HPub && cmd.HeaderSize > 0) { - headers = cmd.Payload[..cmd.HeaderSize]; - payload = cmd.Payload[cmd.HeaderSize..]; + headers = payloadMemory[..cmd.HeaderSize]; + payload = payloadMemory[cmd.HeaderSize..]; } - Router?.ProcessMessage(cmd.Subject!, cmd.ReplyTo, headers, payload, this); + var replyTo = cmd.ReplyTo.IsEmpty ? null : Encoding.ASCII.GetString(cmd.ReplyTo.Span); + Router?.ProcessMessage(subject, replyTo, headers, payload, this); } public void RecordJetStreamPubAck(PubAck ack) diff --git a/src/NATS.Server/Protocol/NatsParser.cs b/src/NATS.Server/Protocol/NatsParser.cs index 55fbf40..eb23958 100644 --- a/src/NATS.Server/Protocol/NatsParser.cs +++ b/src/NATS.Server/Protocol/NatsParser.cs @@ -37,23 +37,21 @@ public readonly struct ParsedCommand public sealed class NatsParser { - private static readonly byte[] CrLfBytes = "\r\n"u8.ToArray(); - private readonly int _maxPayload; + private static ReadOnlySpan CrLfBytes => "\r\n"u8; private ILogger? _logger; public ILogger? Logger { set => _logger = value; } // State for split-packet payload reading private bool _awaitingPayload; private int _expectedPayloadSize; - private string? _pendingSubject; - private string? _pendingReplyTo; + private byte[]? _pendingSubject; + private byte[]? _pendingReplyTo; private int _pendingHeaderSize; private CommandType _pendingType; private string _pendingOperation = string.Empty; public NatsParser(int maxPayload = NatsProtocol.MaxPayloadSize, ILogger? logger = null) { - _maxPayload = maxPayload; _logger = logger; } @@ -71,12 +69,23 @@ public sealed class NatsParser { command = default; + if (!TryParseView(ref buffer, out var view)) + return false; + + command = view.Materialize(); + return true; + } + + internal bool TryParseView(ref ReadOnlySequence buffer, out ParsedCommandView command) + { + command = default; + if (_awaitingPayload) return TryReadPayload(ref buffer, out command); // Look for \r\n to find control line var reader = new SequenceReader(buffer); - if (!reader.TryReadTo(out ReadOnlySequence line, CrLfBytes.AsSpan())) + if (!reader.TryReadTo(out ReadOnlySequence line, CrLfBytes)) return false; // Control line size check @@ -114,7 +123,7 @@ public sealed class NatsParser case (byte)'p': if (b1 == (byte)'i') // PING { - command = ParsedCommand.Simple(CommandType.Ping, "PING"); + command = ParsedCommandView.Simple(CommandType.Ping, "PING"); buffer = buffer.Slice(reader.Position); TraceInOp("PING"); return true; @@ -122,7 +131,7 @@ public sealed class NatsParser if (b1 == (byte)'o') // PONG { - command = ParsedCommand.Simple(CommandType.Pong, "PONG"); + command = ParsedCommandView.Simple(CommandType.Pong, "PONG"); buffer = buffer.Slice(reader.Position); TraceInOp("PONG"); return true; @@ -168,7 +177,7 @@ public sealed class NatsParser case (byte)'c': if (b1 == (byte)'o') // CONNECT { - command = ParseConnect(lineSpan); + command = ParseConnect(line); buffer = buffer.Slice(reader.Position); TraceInOp("CONNECT"); return true; @@ -179,7 +188,7 @@ public sealed class NatsParser case (byte)'i': if (b1 == (byte)'n') // INFO { - command = ParseInfo(lineSpan); + command = ParseInfo(line); buffer = buffer.Slice(reader.Position); TraceInOp("INFO"); return true; @@ -188,13 +197,13 @@ public sealed class NatsParser break; case (byte)'+': // +OK - command = ParsedCommand.Simple(CommandType.Ok, "+OK"); + command = ParsedCommandView.Simple(CommandType.Ok, "+OK"); buffer = buffer.Slice(reader.Position); TraceInOp("+OK"); return true; case (byte)'-': // -ERR - command = ParsedCommand.Simple(CommandType.Err, "-ERR"); + command = ParsedCommandView.Simple(CommandType.Err, "-ERR"); buffer = buffer.Slice(reader.Position); TraceInOp("-ERR"); return true; @@ -227,7 +236,7 @@ public sealed class NatsParser Span line, ref ReadOnlySequence buffer, SequencePosition afterLine, - out ParsedCommand command) + out ParsedCommandView command) { command = default; @@ -236,19 +245,19 @@ public sealed class NatsParser var argsSpan = line[4..]; int argCount = SplitArgs(argsSpan, ranges); - string subject; - string? reply = null; + byte[] subject; + byte[]? reply = null; int size; if (argCount == 2) { - subject = Encoding.ASCII.GetString(argsSpan[ranges[0]]); + subject = argsSpan[ranges[0]].ToArray(); size = ParseSize(argsSpan[ranges[1]]); } else if (argCount == 3) { - subject = Encoding.ASCII.GetString(argsSpan[ranges[0]]); - reply = Encoding.ASCII.GetString(argsSpan[ranges[1]]); + subject = argsSpan[ranges[0]].ToArray(); + reply = argsSpan[ranges[1]].ToArray(); size = ParseSize(argsSpan[ranges[2]]); } else @@ -277,7 +286,7 @@ public sealed class NatsParser Span line, ref ReadOnlySequence buffer, SequencePosition afterLine, - out ParsedCommand command) + out ParsedCommandView command) { command = default; @@ -286,20 +295,20 @@ public sealed class NatsParser var argsSpan = line[5..]; int argCount = SplitArgs(argsSpan, ranges); - string subject; - string? reply = null; + byte[] subject; + byte[]? reply = null; int hdrSize, totalSize; if (argCount == 3) { - subject = Encoding.ASCII.GetString(argsSpan[ranges[0]]); + subject = argsSpan[ranges[0]].ToArray(); hdrSize = ParseSize(argsSpan[ranges[1]]); totalSize = ParseSize(argsSpan[ranges[2]]); } else if (argCount == 4) { - subject = Encoding.ASCII.GetString(argsSpan[ranges[0]]); - reply = Encoding.ASCII.GetString(argsSpan[ranges[1]]); + subject = argsSpan[ranges[0]].ToArray(); + reply = argsSpan[ranges[1]].ToArray(); hdrSize = ParseSize(argsSpan[ranges[2]]); totalSize = ParseSize(argsSpan[ranges[3]]); } @@ -324,7 +333,7 @@ public sealed class NatsParser return TryReadPayload(ref buffer, out command); } - private bool TryReadPayload(ref ReadOnlySequence buffer, out ParsedCommand command) + private bool TryReadPayload(ref ReadOnlySequence buffer, out ParsedCommandView command) { command = default; @@ -333,10 +342,7 @@ public sealed class NatsParser if (buffer.Length < needed) return false; - // Extract payload var payloadSlice = buffer.Slice(0, _expectedPayloadSize); - var payload = new byte[_expectedPayloadSize]; - payloadSlice.CopyTo(payload); // Verify \r\n after payload var trailer = buffer.Slice(_expectedPayloadSize, 2); @@ -345,23 +351,25 @@ public sealed class NatsParser if (trailerBytes[0] != (byte)'\r' || trailerBytes[1] != (byte)'\n') throw new ProtocolViolationException("Expected \\r\\n after payload"); - command = new ParsedCommand + command = new ParsedCommandView { Type = _pendingType, Operation = _pendingOperation, Subject = _pendingSubject, ReplyTo = _pendingReplyTo, - Payload = payload, + Payload = payloadSlice, HeaderSize = _pendingHeaderSize, MaxMessages = -1, }; buffer = buffer.Slice(buffer.GetPosition(needed)); _awaitingPayload = false; + _pendingSubject = null; + _pendingReplyTo = null; return true; } - private static ParsedCommand ParseSub(Span line) + private static ParsedCommandView ParseSub(Span line) { // SUB subject [queue] sid -- skip "SUB " if (line.Length < 5) @@ -372,28 +380,28 @@ public sealed class NatsParser return argCount switch { - 2 => new ParsedCommand + 2 => new ParsedCommandView { Type = CommandType.Sub, Operation = "SUB", - Subject = Encoding.ASCII.GetString(argsSpan[ranges[0]]), - Sid = Encoding.ASCII.GetString(argsSpan[ranges[1]]), + Subject = CopyBytes(argsSpan[ranges[0]]), + Sid = CopyBytes(argsSpan[ranges[1]]), MaxMessages = -1, }, - 3 => new ParsedCommand + 3 => new ParsedCommandView { Type = CommandType.Sub, Operation = "SUB", - Subject = Encoding.ASCII.GetString(argsSpan[ranges[0]]), - Queue = Encoding.ASCII.GetString(argsSpan[ranges[1]]), - Sid = Encoding.ASCII.GetString(argsSpan[ranges[2]]), + Subject = CopyBytes(argsSpan[ranges[0]]), + Queue = CopyBytes(argsSpan[ranges[1]]), + Sid = CopyBytes(argsSpan[ranges[2]]), MaxMessages = -1, }, _ => throw new ProtocolViolationException("Invalid SUB arguments"), }; } - private static ParsedCommand ParseUnsub(Span line) + private static ParsedCommandView ParseUnsub(Span line) { // UNSUB sid [max_msgs] -- skip "UNSUB " if (line.Length < 7) @@ -404,58 +412,59 @@ public sealed class NatsParser return argCount switch { - 1 => new ParsedCommand + 1 => new ParsedCommandView { Type = CommandType.Unsub, Operation = "UNSUB", - Sid = Encoding.ASCII.GetString(argsSpan[ranges[0]]), + Sid = CopyBytes(argsSpan[ranges[0]]), MaxMessages = -1, }, - 2 => new ParsedCommand + 2 => new ParsedCommandView { Type = CommandType.Unsub, Operation = "UNSUB", - Sid = Encoding.ASCII.GetString(argsSpan[ranges[0]]), + Sid = CopyBytes(argsSpan[ranges[0]]), MaxMessages = ParseSize(argsSpan[ranges[1]]), }, _ => throw new ProtocolViolationException("Invalid UNSUB arguments"), }; } - private static ParsedCommand ParseConnect(Span line) + private static ParsedCommandView ParseConnect(ReadOnlySequence line) { - // CONNECT {json} -- find first space after command - int spaceIdx = line.IndexOf((byte)' '); - if (spaceIdx < 0) + var reader = new SequenceReader(line); + if (!reader.TryAdvanceTo((byte)' ', advancePastDelimiter: true)) throw new ProtocolViolationException("Invalid CONNECT"); - var json = line[(spaceIdx + 1)..]; - return new ParsedCommand + var json = line.Slice(reader.Position); + return new ParsedCommandView { Type = CommandType.Connect, Operation = "CONNECT", - Payload = json.ToArray(), + Payload = json, MaxMessages = -1, }; } - private static ParsedCommand ParseInfo(Span line) + private static ParsedCommandView ParseInfo(ReadOnlySequence line) { - // INFO {json} -- find first space after command - int spaceIdx = line.IndexOf((byte)' '); - if (spaceIdx < 0) + var reader = new SequenceReader(line); + if (!reader.TryAdvanceTo((byte)' ', advancePastDelimiter: true)) throw new ProtocolViolationException("Invalid INFO"); - var json = line[(spaceIdx + 1)..]; - return new ParsedCommand + var json = line.Slice(reader.Position); + return new ParsedCommandView { Type = CommandType.Info, Operation = "INFO", - Payload = json.ToArray(), + Payload = json, MaxMessages = -1, }; } + private static ReadOnlyMemory CopyBytes(ReadOnlySpan value) => + value.IsEmpty ? ReadOnlyMemory.Empty : value.ToArray(); + /// /// Parse a decimal integer from ASCII bytes. Returns -1 on error. /// diff --git a/src/NATS.Server/Protocol/ParsedCommandView.cs b/src/NATS.Server/Protocol/ParsedCommandView.cs new file mode 100644 index 0000000..0be6db1 --- /dev/null +++ b/src/NATS.Server/Protocol/ParsedCommandView.cs @@ -0,0 +1,42 @@ +using System.Buffers; +using System.Text; + +namespace NATS.Server.Protocol; + +public readonly struct ParsedCommandView +{ + public CommandType Type { get; init; } + public string? Operation { get; init; } + public ReadOnlyMemory Subject { get; init; } + public ReadOnlyMemory ReplyTo { get; init; } + public ReadOnlyMemory Queue { get; init; } + public ReadOnlyMemory Sid { get; init; } + public int MaxMessages { get; init; } + public int HeaderSize { get; init; } + public ReadOnlySequence Payload { get; init; } + + public static ParsedCommandView Simple(CommandType type, string operation) => + new() { Type = type, Operation = operation, MaxMessages = -1 }; + + public ReadOnlyMemory GetPayloadMemory() => + Payload.IsEmpty ? ReadOnlyMemory.Empty + : Payload.IsSingleSegment ? Payload.First + : Payload.ToArray(); + + public ParsedCommand Materialize() => + new() + { + Type = Type, + Operation = Operation, + Subject = DecodeAsciiOrNull(Subject), + ReplyTo = DecodeAsciiOrNull(ReplyTo), + Queue = DecodeAsciiOrNull(Queue), + Sid = DecodeAsciiOrNull(Sid), + MaxMessages = MaxMessages, + HeaderSize = HeaderSize, + Payload = GetPayloadMemory(), + }; + + private static string? DecodeAsciiOrNull(ReadOnlyMemory value) => + value.IsEmpty ? null : Encoding.ASCII.GetString(value.Span); +} diff --git a/tests/NATS.Server.Benchmark.Tests/NATS.Server.Benchmark.Tests.csproj b/tests/NATS.Server.Benchmark.Tests/NATS.Server.Benchmark.Tests.csproj index 6290932..4655016 100644 --- a/tests/NATS.Server.Benchmark.Tests/NATS.Server.Benchmark.Tests.csproj +++ b/tests/NATS.Server.Benchmark.Tests/NATS.Server.Benchmark.Tests.csproj @@ -23,4 +23,8 @@ + + + + diff --git a/tests/NATS.Server.Benchmark.Tests/Protocol/ParserHotPathBenchmarks.cs b/tests/NATS.Server.Benchmark.Tests/Protocol/ParserHotPathBenchmarks.cs new file mode 100644 index 0000000..0502666 --- /dev/null +++ b/tests/NATS.Server.Benchmark.Tests/Protocol/ParserHotPathBenchmarks.cs @@ -0,0 +1,143 @@ +using System.Buffers; +using System.Diagnostics; +using System.Text; +using NATS.Server.Protocol; +using Xunit.Abstractions; + +namespace NATS.Server.Benchmark.Tests.Protocol; + +public class ParserHotPathBenchmarks(ITestOutputHelper output) +{ + [Fact] + [Trait("Category", "Benchmark")] + public void Parser_PING_Throughput() + { + var payload = "PING\r\n"u8.ToArray(); + MeasureSingleChunk("Parser PING", payload, iterations: 500_000); + } + + [Fact] + [Trait("Category", "Benchmark")] + public void Parser_PUB_Throughput() + { + var payload = "PUB bench.subject 16\r\n0123456789ABCDEF\r\n"u8.ToArray(); + MeasureSingleChunk("Parser PUB", payload, iterations: 250_000); + } + + [Fact] + [Trait("Category", "Benchmark")] + public void Parser_HPUB_Throughput() + { + var payload = "HPUB bench.subject 12 28\r\nNATS/1.0\r\n\r\n0123456789ABCDEF\r\n"u8.ToArray(); + MeasureSingleChunk("Parser HPUB", payload, iterations: 200_000); + } + + [Fact] + [Trait("Category", "Benchmark")] + public void Parser_PUB_SplitPayload_Throughput() + { + var firstChunk = "PUB bench.subject 16\r\n01234567"u8.ToArray(); + var secondChunk = "89ABCDEF\r\n"u8.ToArray(); + MeasureSplitPayload("Parser PUB split payload", firstChunk, secondChunk, iterations: 200_000); + } + + private void MeasureSingleChunk(string name, byte[] commandBytes, int iterations) + { + GC.Collect(); + GC.WaitForPendingFinalizers(); + GC.Collect(); + + var parser = new NatsParser(); + var totalBytes = (long)commandBytes.Length * iterations; + var beforeAlloc = GC.GetAllocatedBytesForCurrentThread(); + var stopwatch = Stopwatch.StartNew(); + + for (var i = 0; i < iterations; i++) + { + ReadOnlySequence buffer = new(commandBytes); + if (!parser.TryParseView(ref buffer, out var command)) + throw new InvalidOperationException($"{name} did not produce a parsed command."); + + if (command.Type is CommandType.Pub or CommandType.HPub) + { + var payload = command.GetPayloadMemory(); + if (payload.IsEmpty) + throw new InvalidOperationException($"{name} produced an empty payload unexpectedly."); + } + } + + stopwatch.Stop(); + var allocatedBytes = GC.GetAllocatedBytesForCurrentThread() - beforeAlloc; + WriteResult(name, iterations, totalBytes, stopwatch.Elapsed, allocatedBytes); + } + + private void MeasureSplitPayload(string name, byte[] firstChunkBytes, byte[] secondChunkBytes, int iterations) + { + GC.Collect(); + GC.WaitForPendingFinalizers(); + GC.Collect(); + + var parser = new NatsParser(); + var totalBytes = (long)(firstChunkBytes.Length + secondChunkBytes.Length) * iterations; + var beforeAlloc = GC.GetAllocatedBytesForCurrentThread(); + var stopwatch = Stopwatch.StartNew(); + + for (var i = 0; i < iterations; i++) + { + ReadOnlySequence firstChunk = new(firstChunkBytes); + if (parser.TryParseView(ref firstChunk, out _)) + throw new InvalidOperationException($"{name} should wait for the second payload chunk."); + + ReadOnlySequence secondChunk = CreateSequence(firstChunk.First, secondChunkBytes); + if (!parser.TryParseView(ref secondChunk, out var command)) + throw new InvalidOperationException($"{name} did not complete after the second payload chunk."); + + if (command.GetPayloadMemory().Length != 16) + throw new InvalidOperationException($"{name} produced the wrong payload length."); + } + + stopwatch.Stop(); + var allocatedBytes = GC.GetAllocatedBytesForCurrentThread() - beforeAlloc; + WriteResult(name, iterations, totalBytes, stopwatch.Elapsed, allocatedBytes); + } + + private void WriteResult(string name, int iterations, long totalBytes, TimeSpan elapsed, long allocatedBytes) + { + var operationsPerSecond = iterations / elapsed.TotalSeconds; + var megabytesPerSecond = totalBytes / elapsed.TotalSeconds / (1024.0 * 1024.0); + var bytesPerOperation = allocatedBytes / (double)iterations; + + output.WriteLine($"=== {name} ==="); + output.WriteLine($"Ops: {operationsPerSecond:N0} ops/s"); + output.WriteLine($"Data: {megabytesPerSecond:F1} MB/s"); + output.WriteLine($"Alloc: {bytesPerOperation:F1} B/op"); + output.WriteLine($"Elapsed: {elapsed.TotalMilliseconds:F0} ms"); + output.WriteLine(""); + } + + private static ReadOnlySequence CreateSequence(ReadOnlyMemory remainingBytes, byte[] secondChunk) + { + var first = new BufferSegment(remainingBytes); + var second = first.Append(secondChunk); + return new ReadOnlySequence(first, 0, second, second.Memory.Length); + } + + private sealed class BufferSegment : ReadOnlySequenceSegment + { + public BufferSegment(ReadOnlyMemory memory) + { + Memory = memory; + } + + public BufferSegment Append(ReadOnlyMemory memory) + { + var next = new BufferSegment(memory) + { + RunningIndex = RunningIndex + Memory.Length, + }; + + Next = next; + return next; + } + } +} diff --git a/tests/NATS.Server.Core.Tests/ParserTests.cs b/tests/NATS.Server.Core.Tests/ParserTests.cs index b9accb9..7a8ffbc 100644 --- a/tests/NATS.Server.Core.Tests/ParserTests.cs +++ b/tests/NATS.Server.Core.Tests/ParserTests.cs @@ -61,6 +61,16 @@ public class ParserTests Encoding.ASCII.GetString(cmds[0].Payload.ToArray()).ShouldContain("verbose"); } + [Fact] + public async Task Parse_CONNECT_preserves_json_payload_bytes() + { + const string json = "{\"verbose\":false,\"echo\":true}"; + var cmds = await ParseAsync($"CONNECT {json}\r\n"); + cmds.ShouldHaveSingleItem(); + cmds[0].Type.ShouldBe(CommandType.Connect); + Encoding.ASCII.GetString(cmds[0].Payload.Span).ShouldBe(json); + } + [Fact] public async Task Parse_SUB_without_queue() { @@ -144,6 +154,31 @@ public class ParserTests cmds[0].Payload.ToArray().ShouldBeEmpty(); } + [Fact] + public async Task Parse_split_PUB_payload_across_reads() + { + var pipe = new Pipe(); + var parser = new NatsParser(maxPayload: NatsProtocol.MaxPayloadSize); + + await pipe.Writer.WriteAsync(Encoding.ASCII.GetBytes("PUB foo 5\r\nHe")); + + var first = await pipe.Reader.ReadAsync(); + var firstBuffer = first.Buffer; + parser.TryParse(ref firstBuffer, out _).ShouldBeFalse(); + pipe.Reader.AdvanceTo(firstBuffer.Start, firstBuffer.End); + + await pipe.Writer.WriteAsync(Encoding.ASCII.GetBytes("llo\r\n")); + pipe.Writer.Complete(); + + var second = await pipe.Reader.ReadAsync(); + var secondBuffer = second.Buffer; + parser.TryParse(ref secondBuffer, out var cmd).ShouldBeTrue(); + cmd.Type.ShouldBe(CommandType.Pub); + cmd.Subject.ShouldBe("foo"); + Encoding.ASCII.GetString(cmd.Payload.Span).ShouldBe("Hello"); + pipe.Reader.AdvanceTo(secondBuffer.Start, secondBuffer.End); + } + [Fact] public async Task Parse_case_insensitive() { @@ -173,6 +208,7 @@ public class ParserTests var cmds = await ParseAsync("INFO {\"server_id\":\"test\"}\r\n"); cmds.ShouldHaveSingleItem(); cmds[0].Type.ShouldBe(CommandType.Info); + Encoding.ASCII.GetString(cmds[0].Payload.Span).ShouldBe("{\"server_id\":\"test\"}"); } // Mirrors Go TestParsePubArg: verifies subject, optional reply, and payload size diff --git a/tests/NATS.Server.Core.Tests/Protocol/ClientProtocolGoParityTests.cs b/tests/NATS.Server.Core.Tests/Protocol/ClientProtocolGoParityTests.cs index 8e0691e..dc58e4d 100644 --- a/tests/NATS.Server.Core.Tests/Protocol/ClientProtocolGoParityTests.cs +++ b/tests/NATS.Server.Core.Tests/Protocol/ClientProtocolGoParityTests.cs @@ -232,6 +232,35 @@ public class ClientProtocolGoParityTests } } + [Fact] + public async Task Split_pub_payload_is_delivered_across_client_reads() + { + var (server, port, cts) = await StartServerAsync(); + try + { + using var sub = await ConnectAndPingAsync(port); + using var pub = await ConnectAndPingAsync(port); + + await sub.SendAsync(Encoding.ASCII.GetBytes("SUB foo 1\r\nPING\r\n")); + await SocketTestHelper.ReadUntilAsync(sub, "PONG\r\n"); + + await pub.SendAsync(Encoding.ASCII.GetBytes("PUB foo 5\r\nHe")); + await Task.Delay(25); + await pub.SendAsync(Encoding.ASCII.GetBytes("llo\r\nPING\r\n")); + await SocketTestHelper.ReadUntilAsync(pub, "PONG\r\n"); + + await sub.SendAsync(Encoding.ASCII.GetBytes("PING\r\n")); + var response = await SocketTestHelper.ReadUntilAsync(sub, "PONG\r\n"); + + response.ShouldContain("MSG foo 1 5\r\nHello\r\n"); + } + finally + { + await cts.CancelAsync(); + server.Dispose(); + } + } + // ========================================================================= // TestTraceMsg — client_test.go:1700 // Tests that trace message formatting truncates correctly. diff --git a/tests/NATS.Server.Core.Tests/Protocol/ParserSpanRetentionTests.cs b/tests/NATS.Server.Core.Tests/Protocol/ParserSpanRetentionTests.cs new file mode 100644 index 0000000..209ed52 --- /dev/null +++ b/tests/NATS.Server.Core.Tests/Protocol/ParserSpanRetentionTests.cs @@ -0,0 +1,155 @@ +using System.Buffers; +using System.IO.Pipelines; +using System.Reflection; +using System.Text; +using NATS.Server.Protocol; + +namespace NATS.Server.Core.Tests.ProtocolParity; + +public class ParserSpanRetentionTests +{ + [Fact] + public void TryParseView_exposes_PUB_fields_as_byte_views() + { + var parser = new NatsParser(); + ReadOnlySequence buffer = new(Encoding.ASCII.GetBytes("PUB foo reply 5\r\nHello\r\n")); + + var parsed = TryParseView(parser, ref buffer, out var view); + + parsed.ShouldBeTrue(); + GetCommandType(view).ShouldBe(CommandType.Pub); + GetAscii(view, "Subject").ShouldBe("foo"); + GetAscii(view, "ReplyTo").ShouldBe("reply"); + GetAscii(view, "Payload").ShouldBe("Hello"); + GetPropertyType(view, "Subject").ShouldNotBe(typeof(string)); + GetPropertyType(view, "ReplyTo").ShouldNotBe(typeof(string)); + } + + [Fact] + public void TryParseView_exposes_HPUB_fields_as_byte_views() + { + const string header = "NATS/1.0\r\n\r\n"; + const string payload = "Hello"; + var total = header.Length + payload.Length; + var parser = new NatsParser(); + ReadOnlySequence buffer = new(Encoding.ASCII.GetBytes( + $"HPUB foo reply {header.Length} {total}\r\n{header}{payload}\r\n")); + + var parsed = TryParseView(parser, ref buffer, out var view); + + parsed.ShouldBeTrue(); + GetCommandType(view).ShouldBe(CommandType.HPub); + GetAscii(view, "Subject").ShouldBe("foo"); + GetAscii(view, "ReplyTo").ShouldBe("reply"); + GetAscii(view, "Payload").ShouldBe(header + payload); + GetInt(view, "HeaderSize").ShouldBe(header.Length); + GetPropertyType(view, "Payload").ShouldNotBe(typeof(byte[])); + } + + [Fact] + public void TryParseView_exposes_CONNECT_payload_as_byte_view() + { + const string json = "{\"verbose\":false,\"echo\":true}"; + var parser = new NatsParser(); + ReadOnlySequence buffer = new(Encoding.ASCII.GetBytes($"CONNECT {json}\r\n")); + + var parsed = TryParseView(parser, ref buffer, out var view); + + parsed.ShouldBeTrue(); + GetCommandType(view).ShouldBe(CommandType.Connect); + GetAscii(view, "Payload").ShouldBe(json); + GetPropertyType(view, "Payload").ShouldNotBe(typeof(byte[])); + } + + [Fact] + public void TryParseView_exposes_INFO_payload_as_byte_view() + { + const string json = "{\"server_id\":\"test\"}"; + var parser = new NatsParser(); + ReadOnlySequence buffer = new(Encoding.ASCII.GetBytes($"INFO {json}\r\n")); + + var parsed = TryParseView(parser, ref buffer, out var view); + + parsed.ShouldBeTrue(); + GetCommandType(view).ShouldBe(CommandType.Info); + GetAscii(view, "Payload").ShouldBe(json); + GetPropertyType(view, "Payload").ShouldNotBe(typeof(byte[])); + } + + [Fact] + public async Task TryParseView_preserves_split_payload_state_across_reads() + { + var parser = new NatsParser(); + var pipe = new Pipe(); + + await pipe.Writer.WriteAsync(Encoding.ASCII.GetBytes("PUB foo 5\r\nHe")); + + var first = await pipe.Reader.ReadAsync(); + var firstBuffer = first.Buffer; + TryParseView(parser, ref firstBuffer, out _).ShouldBeFalse(); + pipe.Reader.AdvanceTo(firstBuffer.Start, firstBuffer.End); + + await pipe.Writer.WriteAsync(Encoding.ASCII.GetBytes("llo\r\n")); + pipe.Writer.Complete(); + + var second = await pipe.Reader.ReadAsync(); + var secondBuffer = second.Buffer; + TryParseView(parser, ref secondBuffer, out var view).ShouldBeTrue(); + GetCommandType(view).ShouldBe(CommandType.Pub); + GetAscii(view, "Subject").ShouldBe("foo"); + GetAscii(view, "Payload").ShouldBe("Hello"); + pipe.Reader.AdvanceTo(secondBuffer.Start, secondBuffer.End); + } + + private static bool TryParseView(NatsParser parser, ref ReadOnlySequence buffer, out object view) + { + var method = typeof(NatsParser).GetMethod( + "TryParseView", + BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic); + + method.ShouldNotBeNull("NatsParser should expose a byte-first TryParseView API."); + + object?[] args = + [ + buffer, + null, + ]; + + var parsed = (bool)method!.Invoke(parser, args)!; + buffer = (ReadOnlySequence)args[0]!; + view = args[1]!; + return parsed; + } + + private static CommandType GetCommandType(object view) => + (CommandType)GetRequiredProperty(view, "Type").GetValue(view)!; + + private static int GetInt(object view, string propertyName) => + (int)GetRequiredProperty(view, propertyName).GetValue(view)!; + + private static Type GetPropertyType(object view, string propertyName) => + GetRequiredProperty(view, propertyName).PropertyType; + + private static string GetAscii(object view, string propertyName) + { + var property = GetRequiredProperty(view, propertyName); + var value = property.GetValue(view); + + return value switch + { + ReadOnlyMemory memory => Encoding.ASCII.GetString(memory.Span), + ReadOnlySequence sequence => Encoding.ASCII.GetString(sequence.ToArray()), + byte[] bytes => Encoding.ASCII.GetString(bytes), + null => string.Empty, + _ => throw new InvalidOperationException( + $"Unsupported property type for {propertyName}: {property.PropertyType}"), + }; + } + + private static PropertyInfo GetRequiredProperty(object view, string propertyName) + { + var property = view.GetType().GetProperty(propertyName, BindingFlags.Instance | BindingFlags.Public); + property.ShouldNotBeNull($"Expected property {propertyName} on {view.GetType().Name}."); + return property!; + } +} diff --git a/tests/NATS.Server.Core.Tests/Protocol/ProtocolParserSnippetGapParityTests.cs b/tests/NATS.Server.Core.Tests/Protocol/ProtocolParserSnippetGapParityTests.cs index f2831f4..6bfc1e0 100644 --- a/tests/NATS.Server.Core.Tests/Protocol/ProtocolParserSnippetGapParityTests.cs +++ b/tests/NATS.Server.Core.Tests/Protocol/ProtocolParserSnippetGapParityTests.cs @@ -42,4 +42,15 @@ public class ProtocolParserSnippetGapParityTests ex.Message.ShouldContain("Maximum control line exceeded"); ex.Message.ShouldContain("snip="); } + + [Fact] + public void Parse_invalid_payload_trailer_preserves_existing_error_message() + { + var parser = new NatsParser(); + var input = Encoding.ASCII.GetBytes("PUB foo 5\r\nHelloXX"); + ReadOnlySequence buffer = new(input); + + var ex = Should.Throw(() => parser.TryParse(ref buffer, out _)); + ex.Message.ShouldBe("Expected \\r\\n after payload"); + } }