from __future__ import annotations
from dataclasses import dataclass, field
from typing import Protocol
from at_py.runner_docker import (
AT_RUNNER_CLIENT_REQUIREMENT,
DEFAULT_AT_RUNNER_GRPC_PORT,
DEFAULT_AT_RUNNER_IMAGE,
grpc_target_for_host_port,
start_at_runner_container,
stop_at_runner_container,
wait_tcp,
)
from at_py.types import RunResult
_AT_RUNNER_INSTALL_HINT = (
f"pip install '{AT_RUNNER_CLIENT_REQUIREMENT}'\n"
"This installs the at-runner Python gRPC client from the tested public tag."
)
[docs]
class RunnerBackend(Protocol):
[docs]
def run_sync(
self,
*,
target: str,
model: str,
file_root: str,
inputs: dict[str, bytes] | None = None,
timeout: int | None = None,
) -> RunResult:
"""Run one model synchronously; implementations wrap ``at_runner.run_sync``."""
...
[docs]
@dataclass
class AtRunnerBackend:
"""Backend that calls ``at_runner.run_sync`` (lazy import).
**Remote (default):** use ``ATClient(target="host:port")``; this backend forwards
that gRPC address to the installed ``at_runner`` package.
**Docker:** set ``use_docker=True`` to start the official image
(:data:`~at_py.runner_docker.DEFAULT_AT_RUNNER_IMAGE`) on first use, map
``host_port`` to the container gRPC port, then connect to ``localhost:host_port``.
Requires the ``docker`` CLI on ``PATH``. Call :meth:`close` (or use
:class:`~at_py.client.ATClient` as a context manager) to stop the container.
"""
use_docker: bool = False
docker_image: str = DEFAULT_AT_RUNNER_IMAGE
host_port: int = DEFAULT_AT_RUNNER_GRPC_PORT
docker_grpc_port: int = DEFAULT_AT_RUNNER_GRPC_PORT
connect_host: str = "127.0.0.1"
_container_id: str | None = field(default=None, init=False, repr=False)
[docs]
def run_sync(
self,
*,
target: str,
model: str,
file_root: str,
inputs: dict[str, bytes] | None = None,
timeout: int | None = None,
) -> RunResult:
"""Call ``at_runner.run_sync`` on ``target`` (or Docker localhost when configured)."""
try:
import at_runner # type: ignore
except Exception as e: # pragma: no cover
raise RuntimeError(
"The `at_runner` package is not installed. Install the gRPC client, e.g.:\\n"
+ _AT_RUNNER_INSTALL_HINT
) from e
effective_target = self._effective_target(target)
r = at_runner.run_sync(
effective_target,
model=model,
file_root=file_root,
inputs=inputs,
timeout=timeout,
)
return RunResult(
status=r.status,
exit_code=r.exit_code,
stdout=r.stdout,
stderr=r.stderr,
elapsed=r.elapsed,
files=dict(r.files),
)
def _effective_target(self, target: str) -> str:
"""Return gRPC address: pass-through in remote mode, else localhost + mapped port."""
if not self.use_docker:
return target
self._ensure_docker()
return grpc_target_for_host_port(self.connect_host, self.host_port)
def _ensure_docker(self) -> None:
"""Start the runner container once and wait until the gRPC port accepts connections."""
if self._container_id is not None:
return
cid = start_at_runner_container(
image=self.docker_image,
host_port=self.host_port,
container_port=self.docker_grpc_port,
)
self._container_id = cid
try:
wait_tcp(self.connect_host, self.host_port, timeout_s=60.0)
except TimeoutError:
stop_at_runner_container(cid)
self._container_id = None
raise
[docs]
def close(self) -> None:
"""Stop the Docker container started for ``use_docker=True``, if any."""
if self._container_id is not None:
stop_at_runner_container(self._container_id)
self._container_id = None