155 lines
5.1 KiB
Python
155 lines
5.1 KiB
Python
"""TLS behaviour tests for ``create_channel``.
|
|
|
|
These spin up a real loopback ``grpc.aio`` server with a freshly generated
|
|
self-signed certificate (carrying a ``localhost`` SAN, mirroring the gateway's
|
|
auto-generated cert) and assert the lenient TOFU default lets a client connect
|
|
without any CA configured.
|
|
|
|
Marked ``tls`` and skipped unless ``MXGATEWAY_RUN_TLS_TESTS=1`` because loopback
|
|
TLS handshakes can be timing-flaky on shared CI runners. This mirrors how the
|
|
suite gates anything that depends on real sockets rather than fakes.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import shutil
|
|
import socket
|
|
import ssl
|
|
import subprocess
|
|
import tempfile
|
|
from collections.abc import AsyncIterator
|
|
from pathlib import Path
|
|
|
|
import grpc
|
|
import pytest
|
|
import pytest_asyncio
|
|
|
|
from zb_mom_ww_mxgateway import ClientOptions
|
|
from zb_mom_ww_mxgateway.errors import MxGatewayTransportError
|
|
from zb_mom_ww_mxgateway.generated import mxaccess_gateway_pb2 as pb
|
|
from zb_mom_ww_mxgateway.generated import mxaccess_gateway_pb2_grpc as pb_grpc
|
|
from zb_mom_ww_mxgateway.options import create_channel
|
|
|
|
pytestmark = pytest.mark.tls
|
|
|
|
_RUN_TLS_TESTS = os.environ.get("MXGATEWAY_RUN_TLS_TESTS") == "1"
|
|
_OPENSSL = shutil.which("openssl")
|
|
|
|
requires_tls = pytest.mark.skipif(
|
|
not _RUN_TLS_TESTS,
|
|
reason="set MXGATEWAY_RUN_TLS_TESTS=1 to run loopback TLS tests",
|
|
)
|
|
requires_openssl = pytest.mark.skipif(
|
|
_OPENSSL is None,
|
|
reason="openssl CLI is required to generate a self-signed test certificate",
|
|
)
|
|
|
|
|
|
def _generate_self_signed_cert(directory: Path) -> tuple[Path, Path]:
|
|
"""Generate a self-signed cert/key pair with a ``localhost`` SAN."""
|
|
key_path = directory / "server.key"
|
|
cert_path = directory / "server.crt"
|
|
subprocess.run(
|
|
[
|
|
str(_OPENSSL),
|
|
"req",
|
|
"-x509",
|
|
"-newkey",
|
|
"rsa:2048",
|
|
"-nodes",
|
|
"-keyout",
|
|
str(key_path),
|
|
"-out",
|
|
str(cert_path),
|
|
"-days",
|
|
"1",
|
|
"-subj",
|
|
"/CN=mxgateway-test",
|
|
"-addext",
|
|
"subjectAltName=DNS:localhost,IP:127.0.0.1",
|
|
],
|
|
check=True,
|
|
capture_output=True,
|
|
)
|
|
return cert_path, key_path
|
|
|
|
|
|
def _free_port() -> int:
|
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
|
|
sock.bind(("127.0.0.1", 0))
|
|
return int(sock.getsockname()[1])
|
|
|
|
|
|
class _StaticGatewayServicer(pb_grpc.MxAccessGatewayServicer):
|
|
"""Minimal servicer answering ``OpenSession`` with a fixed session id."""
|
|
|
|
async def OpenSession( # noqa: N802 - generated gRPC method name
|
|
self, request: pb.OpenSessionRequest, context: object
|
|
) -> pb.OpenSessionReply:
|
|
return pb.OpenSessionReply(session_id="tls-session-1")
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def tls_server() -> AsyncIterator[int]:
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
cert_path, key_path = _generate_self_signed_cert(Path(tmp))
|
|
credentials = grpc.ssl_server_credentials(
|
|
[(key_path.read_bytes(), cert_path.read_bytes())]
|
|
)
|
|
server = grpc.aio.server()
|
|
pb_grpc.add_MxAccessGatewayServicer_to_server(_StaticGatewayServicer(), server)
|
|
port = _free_port()
|
|
server.add_secure_port(f"127.0.0.1:{port}", credentials)
|
|
await server.start()
|
|
try:
|
|
yield port
|
|
finally:
|
|
await server.stop(grace=None)
|
|
|
|
|
|
@requires_tls
|
|
@requires_openssl
|
|
@pytest.mark.asyncio
|
|
async def test_default_tls_connects_via_tofu(tls_server: int) -> None:
|
|
"""Default TLS options (no CA) connect by pinning the presented cert."""
|
|
options = ClientOptions(
|
|
endpoint=f"127.0.0.1:{tls_server}",
|
|
api_key="mxgw_test_secret",
|
|
)
|
|
channel = create_channel(options)
|
|
try:
|
|
stub = pb_grpc.MxAccessGatewayStub(channel)
|
|
reply = await stub.OpenSession(pb.OpenSessionRequest(), timeout=10)
|
|
assert reply.session_id == "tls-session-1"
|
|
finally:
|
|
await channel.close()
|
|
|
|
|
|
def test_split_authority_parses_host_and_port() -> None:
|
|
from zb_mom_ww_mxgateway.options import _split_authority
|
|
|
|
assert _split_authority("https://10.0.0.5:5120") == ("10.0.0.5", 5120)
|
|
assert _split_authority("localhost:5120") == ("localhost", 5120)
|
|
assert _split_authority(":5120") == ("localhost", 5120)
|
|
|
|
|
|
def test_tofu_connect_failure_raises_transport_error() -> None:
|
|
"""A failed cert pre-fetch surfaces the client's transport error type."""
|
|
options = ClientOptions(endpoint=f"127.0.0.1:{_free_port()}")
|
|
with pytest.raises(MxGatewayTransportError) as excinfo:
|
|
create_channel(options)
|
|
assert options.endpoint in str(excinfo.value)
|
|
|
|
|
|
def test_require_certificate_validation_uses_system_trust() -> None:
|
|
"""``require_certificate_validation`` must not attempt a TOFU pre-fetch."""
|
|
# Pointing at a closed port: with system-trust the channel is created lazily
|
|
# (no eager pre-fetch), so create_channel must succeed without connecting.
|
|
options = ClientOptions(
|
|
endpoint=f"127.0.0.1:{_free_port()}",
|
|
require_certificate_validation=True,
|
|
)
|
|
channel = create_channel(options)
|
|
assert isinstance(channel, grpc.aio.Channel)
|