156 lines
5.8 KiB
C#
156 lines
5.8 KiB
C#
using System.Buffers;
|
|
using System.IO.Pipelines;
|
|
using System.Reflection;
|
|
using System.Text;
|
|
using NATS.Server.Protocol;
|
|
|
|
namespace NATS.Server.Core.Tests.ProtocolParity;
|
|
|
|
public class ParserSpanRetentionTests
|
|
{
|
|
[Fact]
|
|
public void TryParseView_exposes_PUB_fields_as_byte_views()
|
|
{
|
|
var parser = new NatsParser();
|
|
ReadOnlySequence<byte> buffer = new(Encoding.ASCII.GetBytes("PUB foo reply 5\r\nHello\r\n"));
|
|
|
|
var parsed = TryParseView(parser, ref buffer, out var view);
|
|
|
|
parsed.ShouldBeTrue();
|
|
GetCommandType(view).ShouldBe(CommandType.Pub);
|
|
GetAscii(view, "Subject").ShouldBe("foo");
|
|
GetAscii(view, "ReplyTo").ShouldBe("reply");
|
|
GetAscii(view, "Payload").ShouldBe("Hello");
|
|
GetPropertyType(view, "Subject").ShouldNotBe(typeof(string));
|
|
GetPropertyType(view, "ReplyTo").ShouldNotBe(typeof(string));
|
|
}
|
|
|
|
[Fact]
|
|
public void TryParseView_exposes_HPUB_fields_as_byte_views()
|
|
{
|
|
const string header = "NATS/1.0\r\n\r\n";
|
|
const string payload = "Hello";
|
|
var total = header.Length + payload.Length;
|
|
var parser = new NatsParser();
|
|
ReadOnlySequence<byte> buffer = new(Encoding.ASCII.GetBytes(
|
|
$"HPUB foo reply {header.Length} {total}\r\n{header}{payload}\r\n"));
|
|
|
|
var parsed = TryParseView(parser, ref buffer, out var view);
|
|
|
|
parsed.ShouldBeTrue();
|
|
GetCommandType(view).ShouldBe(CommandType.HPub);
|
|
GetAscii(view, "Subject").ShouldBe("foo");
|
|
GetAscii(view, "ReplyTo").ShouldBe("reply");
|
|
GetAscii(view, "Payload").ShouldBe(header + payload);
|
|
GetInt(view, "HeaderSize").ShouldBe(header.Length);
|
|
GetPropertyType(view, "Payload").ShouldNotBe(typeof(byte[]));
|
|
}
|
|
|
|
[Fact]
|
|
public void TryParseView_exposes_CONNECT_payload_as_byte_view()
|
|
{
|
|
const string json = "{\"verbose\":false,\"echo\":true}";
|
|
var parser = new NatsParser();
|
|
ReadOnlySequence<byte> buffer = new(Encoding.ASCII.GetBytes($"CONNECT {json}\r\n"));
|
|
|
|
var parsed = TryParseView(parser, ref buffer, out var view);
|
|
|
|
parsed.ShouldBeTrue();
|
|
GetCommandType(view).ShouldBe(CommandType.Connect);
|
|
GetAscii(view, "Payload").ShouldBe(json);
|
|
GetPropertyType(view, "Payload").ShouldNotBe(typeof(byte[]));
|
|
}
|
|
|
|
[Fact]
|
|
public void TryParseView_exposes_INFO_payload_as_byte_view()
|
|
{
|
|
const string json = "{\"server_id\":\"test\"}";
|
|
var parser = new NatsParser();
|
|
ReadOnlySequence<byte> buffer = new(Encoding.ASCII.GetBytes($"INFO {json}\r\n"));
|
|
|
|
var parsed = TryParseView(parser, ref buffer, out var view);
|
|
|
|
parsed.ShouldBeTrue();
|
|
GetCommandType(view).ShouldBe(CommandType.Info);
|
|
GetAscii(view, "Payload").ShouldBe(json);
|
|
GetPropertyType(view, "Payload").ShouldNotBe(typeof(byte[]));
|
|
}
|
|
|
|
[Fact]
|
|
public async Task TryParseView_preserves_split_payload_state_across_reads()
|
|
{
|
|
var parser = new NatsParser();
|
|
var pipe = new Pipe();
|
|
|
|
await pipe.Writer.WriteAsync(Encoding.ASCII.GetBytes("PUB foo 5\r\nHe"));
|
|
|
|
var first = await pipe.Reader.ReadAsync();
|
|
var firstBuffer = first.Buffer;
|
|
TryParseView(parser, ref firstBuffer, out _).ShouldBeFalse();
|
|
pipe.Reader.AdvanceTo(firstBuffer.Start, firstBuffer.End);
|
|
|
|
await pipe.Writer.WriteAsync(Encoding.ASCII.GetBytes("llo\r\n"));
|
|
pipe.Writer.Complete();
|
|
|
|
var second = await pipe.Reader.ReadAsync();
|
|
var secondBuffer = second.Buffer;
|
|
TryParseView(parser, ref secondBuffer, out var view).ShouldBeTrue();
|
|
GetCommandType(view).ShouldBe(CommandType.Pub);
|
|
GetAscii(view, "Subject").ShouldBe("foo");
|
|
GetAscii(view, "Payload").ShouldBe("Hello");
|
|
pipe.Reader.AdvanceTo(secondBuffer.Start, secondBuffer.End);
|
|
}
|
|
|
|
private static bool TryParseView(NatsParser parser, ref ReadOnlySequence<byte> buffer, out object view)
|
|
{
|
|
var method = typeof(NatsParser).GetMethod(
|
|
"TryParseView",
|
|
BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic);
|
|
|
|
method.ShouldNotBeNull("NatsParser should expose a byte-first TryParseView API.");
|
|
|
|
object?[] args =
|
|
[
|
|
buffer,
|
|
null,
|
|
];
|
|
|
|
var parsed = (bool)method!.Invoke(parser, args)!;
|
|
buffer = (ReadOnlySequence<byte>)args[0]!;
|
|
view = args[1]!;
|
|
return parsed;
|
|
}
|
|
|
|
private static CommandType GetCommandType(object view) =>
|
|
(CommandType)GetRequiredProperty(view, "Type").GetValue(view)!;
|
|
|
|
private static int GetInt(object view, string propertyName) =>
|
|
(int)GetRequiredProperty(view, propertyName).GetValue(view)!;
|
|
|
|
private static Type GetPropertyType(object view, string propertyName) =>
|
|
GetRequiredProperty(view, propertyName).PropertyType;
|
|
|
|
private static string GetAscii(object view, string propertyName)
|
|
{
|
|
var property = GetRequiredProperty(view, propertyName);
|
|
var value = property.GetValue(view);
|
|
|
|
return value switch
|
|
{
|
|
ReadOnlyMemory<byte> memory => Encoding.ASCII.GetString(memory.Span),
|
|
ReadOnlySequence<byte> sequence => Encoding.ASCII.GetString(sequence.ToArray()),
|
|
byte[] bytes => Encoding.ASCII.GetString(bytes),
|
|
null => string.Empty,
|
|
_ => throw new InvalidOperationException(
|
|
$"Unsupported property type for {propertyName}: {property.PropertyType}"),
|
|
};
|
|
}
|
|
|
|
private static PropertyInfo GetRequiredProperty(object view, string propertyName)
|
|
{
|
|
var property = view.GetType().GetProperty(propertyName, BindingFlags.Instance | BindingFlags.Public);
|
|
property.ShouldNotBeNull($"Expected property {propertyName} on {view.GetType().Name}.");
|
|
return property!;
|
|
}
|
|
}
|