feat: port session 08 — Client Connection & PROXY Protocol
- ClientConnection: full connection lifecycle, string/identity helpers, SplitSubjectQueue, KindString, MsgParts, SetHeader, message header manipulation (GenHeader, RemoveHeader, SliceHeader, GetHeader) - ClientTypes: ClientConnectionType, ClientProtocol, ClientFlags, ReadCacheFlags, ClosedState, PmrFlags, DenyType, ClientOptions, ClientInfo, NbPool, RouteTarget, ClientKindHelpers - NatsMessageHeaders: complete header utility class (GenHeader, RemoveHeaderIfPrefixPresent, RemoveHeaderIfPresent, SliceHeader, GetHeader, SetHeader, GetHeaderKeyIndex) - ProxyProtocol: PROXY protocol v1/v2 parser (ReadV1Header, ParseV2Header, ReadProxyProtoHeader sync entry point) - ServerErrors: add ErrAuthorization sentinel - Tests: 32 standalone unit tests (proxy protocol: IDs 159-168, 171-178, 180-181; client: IDs 200-201, 247-256) - DB: 195 features → complete (387-581); 32 tests → complete; 81 server-dependent tests → n/a Features: 667 complete, 274 unit tests complete (17.2% overall)
This commit is contained in:
604
dotnet/src/ZB.MOM.NatsNet.Server/Protocol/ProxyProtocol.cs
Normal file
604
dotnet/src/ZB.MOM.NatsNet.Server/Protocol/ProxyProtocol.cs
Normal file
@@ -0,0 +1,604 @@
|
||||
// Copyright 2025 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// Adapted from server/client_proxyproto.go in the NATS server Go source.
|
||||
|
||||
using System.Buffers.Binary;
|
||||
using System.Net;
|
||||
using System.Net.Sockets;
|
||||
using System.Runtime.CompilerServices;
|
||||
using System.Text;
|
||||
|
||||
namespace ZB.MOM.NatsNet.Server.Protocol;
|
||||
|
||||
// ============================================================================
|
||||
// Proxy Protocol v2 constants
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// PROXY protocol v1 and v2 constants.
|
||||
/// Mirrors the const blocks in server/client_proxyproto.go.
|
||||
/// </summary>
|
||||
internal static class ProxyProtoConstants
|
||||
{
|
||||
// v2 signature (12 bytes)
|
||||
internal const string V2Sig = "\x0D\x0A\x0D\x0A\x00\x0D\x0A\x51\x55\x49\x54\x0A";
|
||||
|
||||
// Version and command byte masks
|
||||
internal const byte VerMask = 0xF0;
|
||||
internal const byte Ver2 = 0x20;
|
||||
internal const byte CmdMask = 0x0F;
|
||||
internal const byte CmdLocal = 0x00;
|
||||
internal const byte CmdProxy = 0x01;
|
||||
|
||||
// Address family and protocol masks
|
||||
internal const byte FamilyMask = 0xF0;
|
||||
internal const byte FamilyUnspec = 0x00;
|
||||
internal const byte FamilyInet = 0x10;
|
||||
internal const byte FamilyInet6 = 0x20;
|
||||
internal const byte FamilyUnix = 0x30;
|
||||
internal const byte ProtoMask = 0x0F;
|
||||
internal const byte ProtoUnspec = 0x00;
|
||||
internal const byte ProtoStream = 0x01;
|
||||
internal const byte ProtoDatagram = 0x02;
|
||||
|
||||
// Address sizes
|
||||
internal const int AddrSizeIPv4 = 12; // 4+4+2+2
|
||||
internal const int AddrSizeIPv6 = 36; // 16+16+2+2
|
||||
|
||||
// Fixed v2 header size: 12 (sig) + 1 (ver/cmd) + 1 (fam/proto) + 2 (addr len)
|
||||
internal const int V2HeaderSize = 16;
|
||||
|
||||
// Timeout for reading PROXY protocol header
|
||||
internal static readonly TimeSpan ReadTimeout = TimeSpan.FromSeconds(5);
|
||||
|
||||
// v1 constants
|
||||
internal const string V1Prefix = "PROXY ";
|
||||
internal const int V1MaxLineLen = 107;
|
||||
internal const string V1TCP4 = "TCP4";
|
||||
internal const string V1TCP6 = "TCP6";
|
||||
internal const string V1Unknown = "UNKNOWN";
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Well-known errors
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Well-known PROXY protocol errors.
|
||||
/// Mirrors errProxyProtoInvalid, errProxyProtoUnsupported, etc. in client_proxyproto.go.
|
||||
/// </summary>
|
||||
public static class ProxyProtoErrors
|
||||
{
|
||||
public static readonly Exception Invalid = new InvalidDataException("invalid PROXY protocol header");
|
||||
public static readonly Exception Unsupported = new NotSupportedException("unsupported PROXY protocol feature");
|
||||
public static readonly Exception Timeout = new TimeoutException("timeout reading PROXY protocol header");
|
||||
public static readonly Exception Unrecognized = new InvalidDataException("unrecognized PROXY protocol format");
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// ProxyProtocolAddress
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Address information extracted from a PROXY protocol header.
|
||||
/// Mirrors Go <c>proxyProtoAddr</c>.
|
||||
/// </summary>
|
||||
public sealed class ProxyProtocolAddress
|
||||
{
|
||||
public IPAddress SrcIp { get; }
|
||||
public ushort SrcPort { get; }
|
||||
public IPAddress DstIp { get; }
|
||||
public ushort DstPort { get; }
|
||||
|
||||
internal ProxyProtocolAddress(IPAddress srcIp, ushort srcPort, IPAddress dstIp, ushort dstPort)
|
||||
{
|
||||
SrcIp = srcIp;
|
||||
SrcPort = srcPort;
|
||||
DstIp = dstIp;
|
||||
DstPort = dstPort;
|
||||
}
|
||||
|
||||
/// <summary>Returns "srcIP:srcPort". Mirrors <c>proxyProtoAddr.String()</c>.</summary>
|
||||
public string String() => FormatEndpoint(SrcIp, SrcPort);
|
||||
|
||||
/// <summary>Returns "tcp4" or "tcp6". Mirrors <c>proxyProtoAddr.Network()</c>.</summary>
|
||||
public string Network() => SrcIp.IsIPv4MappedToIPv6 || SrcIp.AddressFamily == AddressFamily.InterNetwork
|
||||
? "tcp4"
|
||||
: "tcp6";
|
||||
|
||||
private static string FormatEndpoint(IPAddress ip, ushort port)
|
||||
{
|
||||
// Match Go net.JoinHostPort — wraps IPv6 in brackets.
|
||||
var addr = ip.AddressFamily == AddressFamily.InterNetworkV6
|
||||
? $"[{ip}]"
|
||||
: ip.ToString();
|
||||
return $"{addr}:{port}";
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// ProxyProtocolConnection
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Wraps a <see cref="Stream"/>/<see cref="Socket"/> to override the remote endpoint
|
||||
/// with the address extracted from the PROXY protocol header.
|
||||
/// Mirrors Go <c>proxyConn</c>.
|
||||
/// </summary>
|
||||
public sealed class ProxyProtocolConnection
|
||||
{
|
||||
private readonly Stream _inner;
|
||||
|
||||
/// <summary>The underlying connection stream.</summary>
|
||||
public Stream InnerStream => _inner;
|
||||
|
||||
/// <summary>The proxied remote address (extracted from the header).</summary>
|
||||
public ProxyProtocolAddress RemoteAddress { get; }
|
||||
|
||||
internal ProxyProtocolConnection(Stream inner, ProxyProtocolAddress remoteAddr)
|
||||
{
|
||||
_inner = inner;
|
||||
RemoteAddress = remoteAddr;
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// ProxyProtocolParser (static)
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Reads and parses PROXY protocol v1 and v2 headers from a <see cref="Stream"/>.
|
||||
/// Mirrors the functions in server/client_proxyproto.go.
|
||||
/// </summary>
|
||||
public static class ProxyProtocolParser
|
||||
{
|
||||
// -------------------------------------------------------------------------
|
||||
// Public entry points
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Reads and parses a PROXY protocol (v1 or v2) header from <paramref name="stream"/>.
|
||||
/// Returns <c>null</c> for LOCAL/UNKNOWN health-check commands.
|
||||
/// Mirrors Go <c>readProxyProtoHeader</c>.
|
||||
/// </summary>
|
||||
public static async Task<ProxyProtocolAddress?> ReadProxyProtoHeaderAsync(
|
||||
Stream stream,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
|
||||
cts.CancelAfter(ProxyProtoConstants.ReadTimeout);
|
||||
var ct = cts.Token;
|
||||
|
||||
// Detect version by reading first 6 bytes.
|
||||
var (version, firstBytes, err) = await DetectVersionAsync(stream, ct).ConfigureAwait(false);
|
||||
if (err is not null) throw err;
|
||||
|
||||
switch (version)
|
||||
{
|
||||
case 1:
|
||||
return await ReadV1HeaderAsync(stream, ct).ConfigureAwait(false);
|
||||
|
||||
case 2:
|
||||
{
|
||||
// Read remaining 6 bytes of signature (bytes 6–11).
|
||||
var remaining = new byte[6];
|
||||
await ReadFullAsync(stream, remaining, ct).ConfigureAwait(false);
|
||||
|
||||
// Verify full signature.
|
||||
var fullSig = Encoding.Latin1.GetString(firstBytes) + Encoding.Latin1.GetString(remaining);
|
||||
if (fullSig != ProxyProtoConstants.V2Sig)
|
||||
throw Wrap(ProxyProtoErrors.Invalid, "invalid signature");
|
||||
|
||||
// Read 4 bytes: ver/cmd, fam/proto, addr-len (2 bytes).
|
||||
var header = new byte[4];
|
||||
await ReadFullAsync(stream, header, ct).ConfigureAwait(false);
|
||||
|
||||
return await ParseV2HeaderAsync(stream, header, ct).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
default:
|
||||
throw new InvalidOperationException($"unsupported PROXY protocol version: {version}");
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reads and parses a PROXY protocol (v1 or v2) header, synchronously.
|
||||
/// Returns <c>null</c> for LOCAL/UNKNOWN health-check commands.
|
||||
/// Mirrors Go <c>readProxyProtoHeader</c>.
|
||||
/// </summary>
|
||||
public static ProxyProtocolAddress? ReadProxyProtoHeader(Stream stream)
|
||||
{
|
||||
var (version, firstBytes) = DetectVersion(stream); // throws Unrecognized if unknown
|
||||
|
||||
if (version == 1)
|
||||
return ReadV1Header(stream);
|
||||
|
||||
// version == 2
|
||||
// Read remaining 6 bytes of the v2 signature (bytes 6–11).
|
||||
var remaining = new byte[6];
|
||||
ReadFull(stream, remaining);
|
||||
|
||||
// Verify the full 12-byte v2 signature.
|
||||
var fullSig = Encoding.Latin1.GetString(firstBytes) + Encoding.Latin1.GetString(remaining);
|
||||
if (fullSig != ProxyProtoConstants.V2Sig)
|
||||
throw Wrap(ProxyProtoErrors.Invalid, "invalid v2 signature");
|
||||
|
||||
// Read 4 bytes: ver/cmd, fam/proto, addr-len (2 bytes).
|
||||
var header = new byte[4];
|
||||
ReadFull(stream, header);
|
||||
|
||||
return ParseV2Header(stream, header.AsSpan());
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reads a PROXY protocol v2 header from a raw byte buffer (test-friendly synchronous version).
|
||||
/// Mirrors Go <c>readProxyProtoV2Header</c>.
|
||||
/// </summary>
|
||||
public static ProxyProtocolAddress? ReadProxyProtoV2Header(Stream stream)
|
||||
{
|
||||
// Set a read deadline by not blocking beyond a reasonable time.
|
||||
// In the synchronous version we rely on a cancellation token internally.
|
||||
using var cts = new CancellationTokenSource(ProxyProtoConstants.ReadTimeout);
|
||||
|
||||
// Read fixed header (16 bytes).
|
||||
var header = new byte[ProxyProtoConstants.V2HeaderSize];
|
||||
ReadFull(stream, header);
|
||||
|
||||
// Validate signature (first 12 bytes).
|
||||
if (Encoding.Latin1.GetString(header, 0, 12) != ProxyProtoConstants.V2Sig)
|
||||
throw Wrap(ProxyProtoErrors.Invalid, "invalid signature");
|
||||
|
||||
// Parse after signature: bytes 12-15 (ver/cmd, fam/proto, addr-len).
|
||||
return ParseV2Header(stream, header.AsSpan(12, 4));
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Internal: version detection
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
internal static async Task<(int version, byte[] firstBytes, Exception? err)> DetectVersionAsync(
|
||||
Stream stream, CancellationToken ct)
|
||||
{
|
||||
var buf = new byte[6];
|
||||
try
|
||||
{
|
||||
await ReadFullAsync(stream, buf, ct).ConfigureAwait(false);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
return (0, buf, new IOException("failed to read protocol version", ex));
|
||||
}
|
||||
|
||||
var s = Encoding.Latin1.GetString(buf);
|
||||
if (s == ProxyProtoConstants.V1Prefix)
|
||||
return (1, buf, null);
|
||||
if (s == ProxyProtoConstants.V2Sig[..6])
|
||||
return (2, buf, null);
|
||||
|
||||
return (0, buf, ProxyProtoErrors.Unrecognized);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Synchronous version of version detection — used by test code.
|
||||
/// Mirrors Go <c>detectProxyProtoVersion</c>.
|
||||
/// </summary>
|
||||
internal static (int version, byte[] firstBytes) DetectVersion(Stream stream)
|
||||
{
|
||||
var buf = new byte[6];
|
||||
ReadFull(stream, buf);
|
||||
|
||||
var s = Encoding.Latin1.GetString(buf);
|
||||
if (s == ProxyProtoConstants.V1Prefix)
|
||||
return (1, buf);
|
||||
if (s == ProxyProtoConstants.V2Sig[..6])
|
||||
return (2, buf);
|
||||
|
||||
throw ProxyProtoErrors.Unrecognized;
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Internal: v1 parser
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
internal static async Task<ProxyProtocolAddress?> ReadV1HeaderAsync(Stream stream, CancellationToken ct)
|
||||
{
|
||||
// "PROXY " prefix was already consumed (6 bytes).
|
||||
int maxRemaining = ProxyProtoConstants.V1MaxLineLen - 6;
|
||||
var buf = new byte[maxRemaining];
|
||||
int total = 0;
|
||||
int crlfAt = -1;
|
||||
|
||||
while (total < maxRemaining)
|
||||
{
|
||||
var segment = buf.AsMemory(total);
|
||||
int n = await stream.ReadAsync(segment, ct).ConfigureAwait(false);
|
||||
if (n == 0) throw new EndOfStreamException("failed to read v1 line");
|
||||
total += n;
|
||||
|
||||
// Look for CRLF in what we've read so far.
|
||||
for (int i = 0; i < total - 1; i++)
|
||||
{
|
||||
if (buf[i] == '\r' && buf[i + 1] == '\n')
|
||||
{
|
||||
crlfAt = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (crlfAt >= 0) break;
|
||||
}
|
||||
|
||||
if (crlfAt < 0)
|
||||
throw Wrap(ProxyProtoErrors.Invalid, "v1 line too long");
|
||||
|
||||
return ParseV1Line(buf.AsSpan(0, crlfAt));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Synchronous v1 parser. Mirrors Go <c>readProxyProtoV1Header</c>.
|
||||
/// </summary>
|
||||
internal static ProxyProtocolAddress? ReadV1Header(Stream stream)
|
||||
{
|
||||
int maxRemaining = ProxyProtoConstants.V1MaxLineLen - 6;
|
||||
var buf = new byte[maxRemaining];
|
||||
int total = 0;
|
||||
int crlfAt = -1;
|
||||
|
||||
while (total < maxRemaining)
|
||||
{
|
||||
int n = stream.Read(buf, total, maxRemaining - total);
|
||||
if (n == 0) throw new EndOfStreamException("failed to read v1 line");
|
||||
total += n;
|
||||
|
||||
for (int i = 0; i < total - 1; i++)
|
||||
{
|
||||
if (buf[i] == '\r' && buf[i + 1] == '\n')
|
||||
{
|
||||
crlfAt = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (crlfAt >= 0) break;
|
||||
}
|
||||
|
||||
if (crlfAt < 0)
|
||||
throw Wrap(ProxyProtoErrors.Invalid, "v1 line too long");
|
||||
|
||||
return ParseV1Line(buf.AsSpan(0, crlfAt));
|
||||
}
|
||||
|
||||
private static ProxyProtocolAddress? ParseV1Line(ReadOnlySpan<byte> line)
|
||||
{
|
||||
var text = Encoding.ASCII.GetString(line).Trim();
|
||||
var parts = text.Split((char[]?)null, StringSplitOptions.RemoveEmptyEntries);
|
||||
|
||||
if (parts.Length < 1)
|
||||
throw Wrap(ProxyProtoErrors.Invalid, "invalid v1 format");
|
||||
|
||||
// UNKNOWN is a health-check (like LOCAL in v2).
|
||||
if (parts[0] == ProxyProtoConstants.V1Unknown)
|
||||
return null;
|
||||
|
||||
if (parts.Length != 5)
|
||||
throw Wrap(ProxyProtoErrors.Invalid, "invalid v1 format");
|
||||
|
||||
var protocol = parts[0];
|
||||
if (!IPAddress.TryParse(parts[1], out var srcIp) || !IPAddress.TryParse(parts[2], out var dstIp))
|
||||
throw Wrap(ProxyProtoErrors.Invalid, "invalid address");
|
||||
|
||||
if (!ushort.TryParse(parts[3], out var srcPort))
|
||||
throw new FormatException("invalid source port");
|
||||
if (!ushort.TryParse(parts[4], out var dstPort))
|
||||
throw new FormatException("invalid dest port");
|
||||
|
||||
// Validate protocol vs IP version.
|
||||
bool isIpv4 = srcIp.AddressFamily == AddressFamily.InterNetwork;
|
||||
if (protocol == ProxyProtoConstants.V1TCP4 && !isIpv4)
|
||||
throw Wrap(ProxyProtoErrors.Invalid, "TCP4 with IPv6 address");
|
||||
if (protocol == ProxyProtoConstants.V1TCP6 && isIpv4)
|
||||
throw Wrap(ProxyProtoErrors.Invalid, "TCP6 with IPv4 address");
|
||||
if (protocol != ProxyProtoConstants.V1TCP4 && protocol != ProxyProtoConstants.V1TCP6)
|
||||
throw Wrap(ProxyProtoErrors.Invalid, $"invalid protocol {protocol}");
|
||||
|
||||
return new ProxyProtocolAddress(srcIp, srcPort, dstIp, dstPort);
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Internal: v2 parser
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
internal static async Task<ProxyProtocolAddress?> ParseV2HeaderAsync(
|
||||
Stream stream, byte[] header, CancellationToken ct)
|
||||
{
|
||||
return ParseV2Header(stream, header.AsSpan());
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Parses PROXY protocol v2 after the signature has been validated.
|
||||
/// <paramref name="header"/> is the 4 bytes: ver/cmd, fam/proto, addr-len (2 bytes).
|
||||
/// Mirrors Go <c>parseProxyProtoV2Header</c>.
|
||||
/// </summary>
|
||||
internal static ProxyProtocolAddress? ParseV2Header(Stream stream, ReadOnlySpan<byte> header)
|
||||
{
|
||||
byte verCmd = header[0];
|
||||
byte version = (byte)(verCmd & ProxyProtoConstants.VerMask);
|
||||
byte command = (byte)(verCmd & ProxyProtoConstants.CmdMask);
|
||||
|
||||
if (version != ProxyProtoConstants.Ver2)
|
||||
throw Wrap(ProxyProtoErrors.Invalid, $"invalid version 0x{version:X2}");
|
||||
|
||||
byte famProto = header[1];
|
||||
byte family = (byte)(famProto & ProxyProtoConstants.FamilyMask);
|
||||
byte proto = (byte)(famProto & ProxyProtoConstants.ProtoMask);
|
||||
|
||||
ushort addrLen = BinaryPrimitives.ReadUInt16BigEndian(header[2..]);
|
||||
|
||||
// LOCAL command — health check.
|
||||
if (command == ProxyProtoConstants.CmdLocal)
|
||||
{
|
||||
if (addrLen > 0)
|
||||
DiscardBytes(stream, addrLen);
|
||||
return null;
|
||||
}
|
||||
|
||||
if (command != ProxyProtoConstants.CmdProxy)
|
||||
throw new InvalidDataException($"unknown PROXY protocol command: 0x{command:X2}");
|
||||
|
||||
if (proto != ProxyProtoConstants.ProtoStream)
|
||||
throw Wrap(ProxyProtoErrors.Unsupported, "only STREAM protocol supported");
|
||||
|
||||
switch (family)
|
||||
{
|
||||
case ProxyProtoConstants.FamilyInet:
|
||||
return ParseIPv4Addr(stream, addrLen);
|
||||
|
||||
case ProxyProtoConstants.FamilyInet6:
|
||||
return ParseIPv6Addr(stream, addrLen);
|
||||
|
||||
case ProxyProtoConstants.FamilyUnspec:
|
||||
if (addrLen > 0)
|
||||
DiscardBytes(stream, addrLen);
|
||||
return null;
|
||||
|
||||
default:
|
||||
throw Wrap(ProxyProtoErrors.Unsupported, $"unsupported address family 0x{family:X2}");
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Parses IPv4 address data.
|
||||
/// Mirrors Go <c>parseIPv4Addr</c>.
|
||||
/// </summary>
|
||||
internal static ProxyProtocolAddress ParseIPv4Addr(Stream stream, ushort addrLen)
|
||||
{
|
||||
if (addrLen < ProxyProtoConstants.AddrSizeIPv4)
|
||||
throw new InvalidDataException($"IPv4 address data too short: {addrLen} bytes");
|
||||
|
||||
var data = new byte[addrLen];
|
||||
ReadFull(stream, data);
|
||||
|
||||
var srcIp = new IPAddress(data[0..4]);
|
||||
var dstIp = new IPAddress(data[4..8]);
|
||||
var srcPort = BinaryPrimitives.ReadUInt16BigEndian(data.AsSpan(8, 2));
|
||||
var dstPort = BinaryPrimitives.ReadUInt16BigEndian(data.AsSpan(10, 2));
|
||||
|
||||
return new ProxyProtocolAddress(srcIp, srcPort, dstIp, dstPort);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Parses IPv6 address data.
|
||||
/// Mirrors Go <c>parseIPv6Addr</c>.
|
||||
/// </summary>
|
||||
internal static ProxyProtocolAddress ParseIPv6Addr(Stream stream, ushort addrLen)
|
||||
{
|
||||
if (addrLen < ProxyProtoConstants.AddrSizeIPv6)
|
||||
throw new InvalidDataException($"IPv6 address data too short: {addrLen} bytes");
|
||||
|
||||
var data = new byte[addrLen];
|
||||
ReadFull(stream, data);
|
||||
|
||||
var srcIp = new IPAddress(data[0..16]);
|
||||
var dstIp = new IPAddress(data[16..32]);
|
||||
var srcPort = BinaryPrimitives.ReadUInt16BigEndian(data.AsSpan(32, 2));
|
||||
var dstPort = BinaryPrimitives.ReadUInt16BigEndian(data.AsSpan(34, 2));
|
||||
|
||||
return new ProxyProtocolAddress(srcIp, srcPort, dstIp, dstPort);
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// I/O helpers
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Fills <paramref name="buf"/> completely, throwing <see cref="EndOfStreamException"/>
|
||||
/// (wrapping as <see cref="IOException"/> with <see cref="UnexpectedEofException"/>)
|
||||
/// on short reads.
|
||||
/// </summary>
|
||||
[MethodImpl(MethodImplOptions.AggressiveInlining)]
|
||||
internal static void ReadFull(Stream stream, byte[] buf)
|
||||
{
|
||||
int total = 0;
|
||||
while (total < buf.Length)
|
||||
{
|
||||
int n = stream.Read(buf, total, buf.Length - total);
|
||||
if (n == 0)
|
||||
throw new IOException("unexpected EOF", new EndOfStreamException());
|
||||
total += n;
|
||||
}
|
||||
}
|
||||
|
||||
internal static async Task ReadFullAsync(Stream stream, byte[] buf, CancellationToken ct)
|
||||
{
|
||||
int total = 0;
|
||||
while (total < buf.Length)
|
||||
{
|
||||
int n = await stream.ReadAsync(buf.AsMemory(total), ct).ConfigureAwait(false);
|
||||
if (n == 0)
|
||||
throw new IOException("unexpected EOF", new EndOfStreamException());
|
||||
total += n;
|
||||
}
|
||||
}
|
||||
|
||||
private static void DiscardBytes(Stream stream, int count)
|
||||
{
|
||||
var discard = new byte[count];
|
||||
ReadFull(stream, discard);
|
||||
}
|
||||
|
||||
private static Exception Wrap(Exception sentinel, string detail)
|
||||
{
|
||||
// Create a new exception that wraps the sentinel but carries the extra detail.
|
||||
// The sentinel remains identifiable via the Message prefix (checked in tests with IsAssignableTo).
|
||||
return new InvalidDataException($"{sentinel.Message}: {detail}", sentinel);
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// StreamAdapter — wraps a byte array as a Stream (for test convenience)
|
||||
// ============================================================================
|
||||
|
||||
/// <summary>
|
||||
/// Minimal read-only <see cref="Stream"/> backed by a byte array.
|
||||
/// Used by test helpers to feed proxy protocol bytes into the parser.
|
||||
/// </summary>
|
||||
internal sealed class ByteArrayStream : Stream
|
||||
{
|
||||
private readonly byte[] _data;
|
||||
private int _pos;
|
||||
|
||||
public ByteArrayStream(byte[] data) { _data = data; }
|
||||
|
||||
public override bool CanRead => true;
|
||||
public override bool CanSeek => false;
|
||||
public override bool CanWrite => false;
|
||||
public override long Length => _data.Length;
|
||||
public override long Position { get => _pos; set => throw new NotSupportedException(); }
|
||||
|
||||
public override int Read(byte[] buffer, int offset, int count)
|
||||
{
|
||||
int available = _data.Length - _pos;
|
||||
if (available <= 0) return 0;
|
||||
int toCopy = Math.Min(count, available);
|
||||
Buffer.BlockCopy(_data, _pos, buffer, offset, toCopy);
|
||||
_pos += toCopy;
|
||||
return toCopy;
|
||||
}
|
||||
|
||||
public override void Flush() => throw new NotSupportedException();
|
||||
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
|
||||
public override void SetLength(long value) => throw new NotSupportedException();
|
||||
public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
|
||||
|
||||
public void SetReadTimeout(int timeout) { }
|
||||
public void SetWriteTimeout(int timeout) { }
|
||||
}
|
||||
Reference in New Issue
Block a user