Source code for at_py.runner_docker

"""Helpers to run the official **at-runner** Docker image for local gRPC.

Used by :class:`~at_py.backend.AtRunnerBackend` when ``use_docker=True`` and by tests.
"""

from __future__ import annotations

import shutil
import socket
import subprocess
import time

from at_py._runner_pin import AT_RUNNER_CLIENT_REQUIREMENT, AT_RUNNER_GIT_REF

DEFAULT_AT_RUNNER_IMAGE = f"ghcr.io/jgebbie/at-runner:{AT_RUNNER_GIT_REF}"
DEFAULT_AT_RUNNER_GRPC_PORT = 50051

__all__ = [
    "AT_RUNNER_CLIENT_REQUIREMENT",
    "AT_RUNNER_GIT_REF",
    "DEFAULT_AT_RUNNER_GRPC_PORT",
    "DEFAULT_AT_RUNNER_IMAGE",
    "assert_docker_available",
    "grpc_target_for_host_port",
    "start_at_runner_container",
    "stop_at_runner_container",
    "wait_tcp",
]


[docs] def wait_tcp(host: str, port: int, *, timeout_s: float = 60.0) -> None: """Block until ``host:port`` accepts a TCP connection or ``timeout_s`` elapses.""" deadline = time.monotonic() + timeout_s last_err: OSError | None = None while time.monotonic() < deadline: try: with socket.create_connection((host, port), timeout=2.0): return except OSError as e: last_err = e time.sleep(0.5) msg = f"{host}:{port} did not accept connections within {timeout_s}s" if last_err is not None: msg += f" (last error: {last_err})" raise TimeoutError(msg)
[docs] def assert_docker_available() -> None: """Raise ``RuntimeError`` if the ``docker`` CLI is not on ``PATH``.""" if shutil.which("docker") is None: raise RuntimeError( "Docker mode requested but `docker` was not found on PATH. " "Install Docker and ensure the CLI is available, or use a remote gRPC target." )
[docs] def start_at_runner_container( *, image: str = DEFAULT_AT_RUNNER_IMAGE, host_port: int = DEFAULT_AT_RUNNER_GRPC_PORT, container_port: int = DEFAULT_AT_RUNNER_GRPC_PORT, ) -> str: """Start a detached container; return **container id** (for :func:`stop_at_runner_container`). Maps ``host_port:container_port`` for the gRPC service. Raises on failure. """ assert_docker_available() cmd = [ "docker", "run", "-d", "--rm", "-p", f"{host_port}:{container_port}", "--tmpfs", "/workspace:rw,noexec,nosuid,size=512m", image, ] proc = subprocess.run( cmd, capture_output=True, text=True, check=False, timeout=180, ) if proc.returncode != 0: err = (proc.stderr or proc.stdout or "").strip() raise RuntimeError(f"docker run failed (exit {proc.returncode}): {err}") container_id = proc.stdout.strip() if not container_id: raise RuntimeError("docker run did not return a container id") return container_id
[docs] def stop_at_runner_container(container_id: str, *, timeout_s: float = 120.0) -> None: """Run ``docker stop`` on the given container id (ignores non-zero exit).""" subprocess.run( ["docker", "stop", container_id], capture_output=True, timeout=timeout_s, check=False, )
[docs] def grpc_target_for_host_port(host: str, port: int) -> str: """Format ``host:port`` for gRPC client ``target`` arguments.""" return f"{host}:{port}"