From 98cbdbdeb882fedf1a86a7601752727e617e2c5d Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Fri, 13 Mar 2026 09:51:17 -0400 Subject: [PATCH] test: lock parser span-retention behavior --- tests/NATS.Server.Core.Tests/ParserTests.cs | 36 ++++ .../Protocol/ParserSpanRetentionTests.cs | 155 ++++++++++++++++++ .../ProtocolParserSnippetGapParityTests.cs | 11 ++ 3 files changed, 202 insertions(+) create mode 100644 tests/NATS.Server.Core.Tests/Protocol/ParserSpanRetentionTests.cs diff --git a/tests/NATS.Server.Core.Tests/ParserTests.cs b/tests/NATS.Server.Core.Tests/ParserTests.cs index b9accb9..7a8ffbc 100644 --- a/tests/NATS.Server.Core.Tests/ParserTests.cs +++ b/tests/NATS.Server.Core.Tests/ParserTests.cs @@ -61,6 +61,16 @@ public class ParserTests Encoding.ASCII.GetString(cmds[0].Payload.ToArray()).ShouldContain("verbose"); } + [Fact] + public async Task Parse_CONNECT_preserves_json_payload_bytes() + { + const string json = "{\"verbose\":false,\"echo\":true}"; + var cmds = await ParseAsync($"CONNECT {json}\r\n"); + cmds.ShouldHaveSingleItem(); + cmds[0].Type.ShouldBe(CommandType.Connect); + Encoding.ASCII.GetString(cmds[0].Payload.Span).ShouldBe(json); + } + [Fact] public async Task Parse_SUB_without_queue() { @@ -144,6 +154,31 @@ public class ParserTests cmds[0].Payload.ToArray().ShouldBeEmpty(); } + [Fact] + public async Task Parse_split_PUB_payload_across_reads() + { + var pipe = new Pipe(); + var parser = new NatsParser(maxPayload: NatsProtocol.MaxPayloadSize); + + await pipe.Writer.WriteAsync(Encoding.ASCII.GetBytes("PUB foo 5\r\nHe")); + + var first = await pipe.Reader.ReadAsync(); + var firstBuffer = first.Buffer; + parser.TryParse(ref firstBuffer, out _).ShouldBeFalse(); + pipe.Reader.AdvanceTo(firstBuffer.Start, firstBuffer.End); + + await pipe.Writer.WriteAsync(Encoding.ASCII.GetBytes("llo\r\n")); + pipe.Writer.Complete(); + + var second = await pipe.Reader.ReadAsync(); + var secondBuffer = second.Buffer; + parser.TryParse(ref secondBuffer, out var cmd).ShouldBeTrue(); + cmd.Type.ShouldBe(CommandType.Pub); + cmd.Subject.ShouldBe("foo"); + Encoding.ASCII.GetString(cmd.Payload.Span).ShouldBe("Hello"); + pipe.Reader.AdvanceTo(secondBuffer.Start, secondBuffer.End); + } + [Fact] public async Task Parse_case_insensitive() { @@ -173,6 +208,7 @@ public class ParserTests var cmds = await ParseAsync("INFO {\"server_id\":\"test\"}\r\n"); cmds.ShouldHaveSingleItem(); cmds[0].Type.ShouldBe(CommandType.Info); + Encoding.ASCII.GetString(cmds[0].Payload.Span).ShouldBe("{\"server_id\":\"test\"}"); } // Mirrors Go TestParsePubArg: verifies subject, optional reply, and payload size diff --git a/tests/NATS.Server.Core.Tests/Protocol/ParserSpanRetentionTests.cs b/tests/NATS.Server.Core.Tests/Protocol/ParserSpanRetentionTests.cs new file mode 100644 index 0000000..209ed52 --- /dev/null +++ b/tests/NATS.Server.Core.Tests/Protocol/ParserSpanRetentionTests.cs @@ -0,0 +1,155 @@ +using System.Buffers; +using System.IO.Pipelines; +using System.Reflection; +using System.Text; +using NATS.Server.Protocol; + +namespace NATS.Server.Core.Tests.ProtocolParity; + +public class ParserSpanRetentionTests +{ + [Fact] + public void TryParseView_exposes_PUB_fields_as_byte_views() + { + var parser = new NatsParser(); + ReadOnlySequence buffer = new(Encoding.ASCII.GetBytes("PUB foo reply 5\r\nHello\r\n")); + + var parsed = TryParseView(parser, ref buffer, out var view); + + parsed.ShouldBeTrue(); + GetCommandType(view).ShouldBe(CommandType.Pub); + GetAscii(view, "Subject").ShouldBe("foo"); + GetAscii(view, "ReplyTo").ShouldBe("reply"); + GetAscii(view, "Payload").ShouldBe("Hello"); + GetPropertyType(view, "Subject").ShouldNotBe(typeof(string)); + GetPropertyType(view, "ReplyTo").ShouldNotBe(typeof(string)); + } + + [Fact] + public void TryParseView_exposes_HPUB_fields_as_byte_views() + { + const string header = "NATS/1.0\r\n\r\n"; + const string payload = "Hello"; + var total = header.Length + payload.Length; + var parser = new NatsParser(); + ReadOnlySequence buffer = new(Encoding.ASCII.GetBytes( + $"HPUB foo reply {header.Length} {total}\r\n{header}{payload}\r\n")); + + var parsed = TryParseView(parser, ref buffer, out var view); + + parsed.ShouldBeTrue(); + GetCommandType(view).ShouldBe(CommandType.HPub); + GetAscii(view, "Subject").ShouldBe("foo"); + GetAscii(view, "ReplyTo").ShouldBe("reply"); + GetAscii(view, "Payload").ShouldBe(header + payload); + GetInt(view, "HeaderSize").ShouldBe(header.Length); + GetPropertyType(view, "Payload").ShouldNotBe(typeof(byte[])); + } + + [Fact] + public void TryParseView_exposes_CONNECT_payload_as_byte_view() + { + const string json = "{\"verbose\":false,\"echo\":true}"; + var parser = new NatsParser(); + ReadOnlySequence buffer = new(Encoding.ASCII.GetBytes($"CONNECT {json}\r\n")); + + var parsed = TryParseView(parser, ref buffer, out var view); + + parsed.ShouldBeTrue(); + GetCommandType(view).ShouldBe(CommandType.Connect); + GetAscii(view, "Payload").ShouldBe(json); + GetPropertyType(view, "Payload").ShouldNotBe(typeof(byte[])); + } + + [Fact] + public void TryParseView_exposes_INFO_payload_as_byte_view() + { + const string json = "{\"server_id\":\"test\"}"; + var parser = new NatsParser(); + ReadOnlySequence buffer = new(Encoding.ASCII.GetBytes($"INFO {json}\r\n")); + + var parsed = TryParseView(parser, ref buffer, out var view); + + parsed.ShouldBeTrue(); + GetCommandType(view).ShouldBe(CommandType.Info); + GetAscii(view, "Payload").ShouldBe(json); + GetPropertyType(view, "Payload").ShouldNotBe(typeof(byte[])); + } + + [Fact] + public async Task TryParseView_preserves_split_payload_state_across_reads() + { + var parser = new NatsParser(); + var pipe = new Pipe(); + + await pipe.Writer.WriteAsync(Encoding.ASCII.GetBytes("PUB foo 5\r\nHe")); + + var first = await pipe.Reader.ReadAsync(); + var firstBuffer = first.Buffer; + TryParseView(parser, ref firstBuffer, out _).ShouldBeFalse(); + pipe.Reader.AdvanceTo(firstBuffer.Start, firstBuffer.End); + + await pipe.Writer.WriteAsync(Encoding.ASCII.GetBytes("llo\r\n")); + pipe.Writer.Complete(); + + var second = await pipe.Reader.ReadAsync(); + var secondBuffer = second.Buffer; + TryParseView(parser, ref secondBuffer, out var view).ShouldBeTrue(); + GetCommandType(view).ShouldBe(CommandType.Pub); + GetAscii(view, "Subject").ShouldBe("foo"); + GetAscii(view, "Payload").ShouldBe("Hello"); + pipe.Reader.AdvanceTo(secondBuffer.Start, secondBuffer.End); + } + + private static bool TryParseView(NatsParser parser, ref ReadOnlySequence buffer, out object view) + { + var method = typeof(NatsParser).GetMethod( + "TryParseView", + BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic); + + method.ShouldNotBeNull("NatsParser should expose a byte-first TryParseView API."); + + object?[] args = + [ + buffer, + null, + ]; + + var parsed = (bool)method!.Invoke(parser, args)!; + buffer = (ReadOnlySequence)args[0]!; + view = args[1]!; + return parsed; + } + + private static CommandType GetCommandType(object view) => + (CommandType)GetRequiredProperty(view, "Type").GetValue(view)!; + + private static int GetInt(object view, string propertyName) => + (int)GetRequiredProperty(view, propertyName).GetValue(view)!; + + private static Type GetPropertyType(object view, string propertyName) => + GetRequiredProperty(view, propertyName).PropertyType; + + private static string GetAscii(object view, string propertyName) + { + var property = GetRequiredProperty(view, propertyName); + var value = property.GetValue(view); + + return value switch + { + ReadOnlyMemory memory => Encoding.ASCII.GetString(memory.Span), + ReadOnlySequence sequence => Encoding.ASCII.GetString(sequence.ToArray()), + byte[] bytes => Encoding.ASCII.GetString(bytes), + null => string.Empty, + _ => throw new InvalidOperationException( + $"Unsupported property type for {propertyName}: {property.PropertyType}"), + }; + } + + private static PropertyInfo GetRequiredProperty(object view, string propertyName) + { + var property = view.GetType().GetProperty(propertyName, BindingFlags.Instance | BindingFlags.Public); + property.ShouldNotBeNull($"Expected property {propertyName} on {view.GetType().Name}."); + return property!; + } +} diff --git a/tests/NATS.Server.Core.Tests/Protocol/ProtocolParserSnippetGapParityTests.cs b/tests/NATS.Server.Core.Tests/Protocol/ProtocolParserSnippetGapParityTests.cs index f2831f4..6bfc1e0 100644 --- a/tests/NATS.Server.Core.Tests/Protocol/ProtocolParserSnippetGapParityTests.cs +++ b/tests/NATS.Server.Core.Tests/Protocol/ProtocolParserSnippetGapParityTests.cs @@ -42,4 +42,15 @@ public class ProtocolParserSnippetGapParityTests ex.Message.ShouldContain("Maximum control line exceeded"); ex.Message.ShouldContain("snip="); } + + [Fact] + public void Parse_invalid_payload_trailer_preserves_existing_error_message() + { + var parser = new NatsParser(); + var input = Encoding.ASCII.GetBytes("PUB foo 5\r\nHelloXX"); + ReadOnlySequence buffer = new(input); + + var ex = Should.Throw(() => parser.TryParse(ref buffer, out _)); + ex.Message.ShouldBe("Expected \\r\\n after payload"); + } }