"""Helpers to run the official **at-runner** Docker image for local gRPC.
Used by :class:`~at_py.backend.AtRunnerBackend` when ``use_docker=True`` and by tests.
"""
from __future__ import annotations
import shutil
import socket
import subprocess
import time
from at_py._runner_pin import AT_RUNNER_CLIENT_REQUIREMENT, AT_RUNNER_GIT_REF
DEFAULT_AT_RUNNER_IMAGE = f"ghcr.io/jgebbie/at-runner:{AT_RUNNER_GIT_REF}"
DEFAULT_AT_RUNNER_GRPC_PORT = 50051
__all__ = [
"AT_RUNNER_CLIENT_REQUIREMENT",
"AT_RUNNER_GIT_REF",
"DEFAULT_AT_RUNNER_GRPC_PORT",
"DEFAULT_AT_RUNNER_IMAGE",
"assert_docker_available",
"grpc_target_for_host_port",
"start_at_runner_container",
"stop_at_runner_container",
"wait_tcp",
]
[docs]
def wait_tcp(host: str, port: int, *, timeout_s: float = 60.0) -> None:
"""Block until ``host:port`` accepts a TCP connection or ``timeout_s`` elapses."""
deadline = time.monotonic() + timeout_s
last_err: OSError | None = None
while time.monotonic() < deadline:
try:
with socket.create_connection((host, port), timeout=2.0):
return
except OSError as e:
last_err = e
time.sleep(0.5)
msg = f"{host}:{port} did not accept connections within {timeout_s}s"
if last_err is not None:
msg += f" (last error: {last_err})"
raise TimeoutError(msg)
[docs]
def assert_docker_available() -> None:
"""Raise ``RuntimeError`` if the ``docker`` CLI is not on ``PATH``."""
if shutil.which("docker") is None:
raise RuntimeError(
"Docker mode requested but `docker` was not found on PATH. "
"Install Docker and ensure the CLI is available, or use a remote gRPC target."
)
[docs]
def start_at_runner_container(
*,
image: str = DEFAULT_AT_RUNNER_IMAGE,
host_port: int = DEFAULT_AT_RUNNER_GRPC_PORT,
container_port: int = DEFAULT_AT_RUNNER_GRPC_PORT,
) -> str:
"""Start a detached container; return **container id** (for :func:`stop_at_runner_container`).
Maps ``host_port:container_port`` for the gRPC service. Raises on failure.
"""
assert_docker_available()
cmd = [
"docker",
"run",
"-d",
"--rm",
"-p",
f"{host_port}:{container_port}",
"--tmpfs",
"/workspace:rw,noexec,nosuid,size=512m",
image,
]
proc = subprocess.run(
cmd,
capture_output=True,
text=True,
check=False,
timeout=180,
)
if proc.returncode != 0:
err = (proc.stderr or proc.stdout or "").strip()
raise RuntimeError(f"docker run failed (exit {proc.returncode}): {err}")
container_id = proc.stdout.strip()
if not container_id:
raise RuntimeError("docker run did not return a container id")
return container_id
[docs]
def stop_at_runner_container(container_id: str, *, timeout_s: float = 120.0) -> None:
"""Run ``docker stop`` on the given container id (ignores non-zero exit)."""
subprocess.run(
["docker", "stop", container_id],
capture_output=True,
timeout=timeout_s,
check=False,
)
[docs]
def grpc_target_for_host_port(host: str, port: int) -> str:
"""Format ``host:port`` for gRPC client ``target`` arguments."""
return f"{host}:{port}"