feat(client-python): accept gateway cert by default via TOFU pre-fetch
This commit is contained in:
@@ -0,0 +1,154 @@
|
||||
"""TLS behaviour tests for ``create_channel``.
|
||||
|
||||
These spin up a real loopback ``grpc.aio`` server with a freshly generated
|
||||
self-signed certificate (carrying a ``localhost`` SAN, mirroring the gateway's
|
||||
auto-generated cert) and assert the lenient TOFU default lets a client connect
|
||||
without any CA configured.
|
||||
|
||||
Marked ``tls`` and skipped unless ``MXGATEWAY_RUN_TLS_TESTS=1`` because loopback
|
||||
TLS handshakes can be timing-flaky on shared CI runners. This mirrors how the
|
||||
suite gates anything that depends on real sockets rather than fakes.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import shutil
|
||||
import socket
|
||||
import ssl
|
||||
import subprocess
|
||||
import tempfile
|
||||
from collections.abc import AsyncIterator
|
||||
from pathlib import Path
|
||||
|
||||
import grpc
|
||||
import pytest
|
||||
import pytest_asyncio
|
||||
|
||||
from zb_mom_ww_mxgateway import ClientOptions
|
||||
from zb_mom_ww_mxgateway.errors import MxGatewayTransportError
|
||||
from zb_mom_ww_mxgateway.generated import mxaccess_gateway_pb2 as pb
|
||||
from zb_mom_ww_mxgateway.generated import mxaccess_gateway_pb2_grpc as pb_grpc
|
||||
from zb_mom_ww_mxgateway.options import create_channel
|
||||
|
||||
pytestmark = pytest.mark.tls
|
||||
|
||||
_RUN_TLS_TESTS = os.environ.get("MXGATEWAY_RUN_TLS_TESTS") == "1"
|
||||
_OPENSSL = shutil.which("openssl")
|
||||
|
||||
requires_tls = pytest.mark.skipif(
|
||||
not _RUN_TLS_TESTS,
|
||||
reason="set MXGATEWAY_RUN_TLS_TESTS=1 to run loopback TLS tests",
|
||||
)
|
||||
requires_openssl = pytest.mark.skipif(
|
||||
_OPENSSL is None,
|
||||
reason="openssl CLI is required to generate a self-signed test certificate",
|
||||
)
|
||||
|
||||
|
||||
def _generate_self_signed_cert(directory: Path) -> tuple[Path, Path]:
|
||||
"""Generate a self-signed cert/key pair with a ``localhost`` SAN."""
|
||||
key_path = directory / "server.key"
|
||||
cert_path = directory / "server.crt"
|
||||
subprocess.run(
|
||||
[
|
||||
str(_OPENSSL),
|
||||
"req",
|
||||
"-x509",
|
||||
"-newkey",
|
||||
"rsa:2048",
|
||||
"-nodes",
|
||||
"-keyout",
|
||||
str(key_path),
|
||||
"-out",
|
||||
str(cert_path),
|
||||
"-days",
|
||||
"1",
|
||||
"-subj",
|
||||
"/CN=mxgateway-test",
|
||||
"-addext",
|
||||
"subjectAltName=DNS:localhost,IP:127.0.0.1",
|
||||
],
|
||||
check=True,
|
||||
capture_output=True,
|
||||
)
|
||||
return cert_path, key_path
|
||||
|
||||
|
||||
def _free_port() -> int:
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
|
||||
sock.bind(("127.0.0.1", 0))
|
||||
return int(sock.getsockname()[1])
|
||||
|
||||
|
||||
class _StaticGatewayServicer(pb_grpc.MxAccessGatewayServicer):
|
||||
"""Minimal servicer answering ``OpenSession`` with a fixed session id."""
|
||||
|
||||
async def OpenSession( # noqa: N802 - generated gRPC method name
|
||||
self, request: pb.OpenSessionRequest, context: object
|
||||
) -> pb.OpenSessionReply:
|
||||
return pb.OpenSessionReply(session_id="tls-session-1")
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def tls_server() -> AsyncIterator[int]:
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
cert_path, key_path = _generate_self_signed_cert(Path(tmp))
|
||||
credentials = grpc.ssl_server_credentials(
|
||||
[(key_path.read_bytes(), cert_path.read_bytes())]
|
||||
)
|
||||
server = grpc.aio.server()
|
||||
pb_grpc.add_MxAccessGatewayServicer_to_server(_StaticGatewayServicer(), server)
|
||||
port = _free_port()
|
||||
server.add_secure_port(f"127.0.0.1:{port}", credentials)
|
||||
await server.start()
|
||||
try:
|
||||
yield port
|
||||
finally:
|
||||
await server.stop(grace=None)
|
||||
|
||||
|
||||
@requires_tls
|
||||
@requires_openssl
|
||||
@pytest.mark.asyncio
|
||||
async def test_default_tls_connects_via_tofu(tls_server: int) -> None:
|
||||
"""Default TLS options (no CA) connect by pinning the presented cert."""
|
||||
options = ClientOptions(
|
||||
endpoint=f"127.0.0.1:{tls_server}",
|
||||
api_key="mxgw_test_secret",
|
||||
)
|
||||
channel = create_channel(options)
|
||||
try:
|
||||
stub = pb_grpc.MxAccessGatewayStub(channel)
|
||||
reply = await stub.OpenSession(pb.OpenSessionRequest(), timeout=10)
|
||||
assert reply.session_id == "tls-session-1"
|
||||
finally:
|
||||
await channel.close()
|
||||
|
||||
|
||||
def test_split_authority_parses_host_and_port() -> None:
|
||||
from zb_mom_ww_mxgateway.options import _split_authority
|
||||
|
||||
assert _split_authority("https://10.0.0.5:5120") == ("10.0.0.5", 5120)
|
||||
assert _split_authority("localhost:5120") == ("localhost", 5120)
|
||||
assert _split_authority(":5120") == ("localhost", 5120)
|
||||
|
||||
|
||||
def test_tofu_connect_failure_raises_transport_error() -> None:
|
||||
"""A failed cert pre-fetch surfaces the client's transport error type."""
|
||||
options = ClientOptions(endpoint=f"127.0.0.1:{_free_port()}")
|
||||
with pytest.raises(MxGatewayTransportError) as excinfo:
|
||||
create_channel(options)
|
||||
assert options.endpoint in str(excinfo.value)
|
||||
|
||||
|
||||
def test_require_certificate_validation_uses_system_trust() -> None:
|
||||
"""``require_certificate_validation`` must not attempt a TOFU pre-fetch."""
|
||||
# Pointing at a closed port: with system-trust the channel is created lazily
|
||||
# (no eager pre-fetch), so create_channel must succeed without connecting.
|
||||
options = ClientOptions(
|
||||
endpoint=f"127.0.0.1:{_free_port()}",
|
||||
require_certificate_validation=True,
|
||||
)
|
||||
channel = create_channel(options)
|
||||
assert isinstance(channel, grpc.aio.Channel)
|
||||
Reference in New Issue
Block a user