Source code for at_py.backend

from __future__ import annotations

from dataclasses import dataclass, field
from typing import Protocol

from at_py.runner_docker import (
    AT_RUNNER_CLIENT_REQUIREMENT,
    DEFAULT_AT_RUNNER_GRPC_PORT,
    DEFAULT_AT_RUNNER_IMAGE,
    grpc_target_for_host_port,
    start_at_runner_container,
    stop_at_runner_container,
    wait_tcp,
)
from at_py.types import RunResult

_AT_RUNNER_INSTALL_HINT = (
    f"pip install '{AT_RUNNER_CLIENT_REQUIREMENT}'\n"
    "This installs the at-runner Python gRPC client from the tested public tag."
)


[docs] class RunnerBackend(Protocol):
[docs] def run_sync( self, *, target: str, model: str, file_root: str, inputs: dict[str, bytes] | None = None, timeout: int | None = None, ) -> RunResult: """Run one model synchronously; implementations wrap ``at_runner.run_sync``.""" ...
[docs] @dataclass class AtRunnerBackend: """Backend that calls ``at_runner.run_sync`` (lazy import). **Remote (default):** use ``ATClient(target="host:port")``; this backend forwards that gRPC address to the installed ``at_runner`` package. **Docker:** set ``use_docker=True`` to start the official image (:data:`~at_py.runner_docker.DEFAULT_AT_RUNNER_IMAGE`) on first use, map ``host_port`` to the container gRPC port, then connect to ``localhost:host_port``. Requires the ``docker`` CLI on ``PATH``. Call :meth:`close` (or use :class:`~at_py.client.ATClient` as a context manager) to stop the container. """ use_docker: bool = False docker_image: str = DEFAULT_AT_RUNNER_IMAGE host_port: int = DEFAULT_AT_RUNNER_GRPC_PORT docker_grpc_port: int = DEFAULT_AT_RUNNER_GRPC_PORT connect_host: str = "127.0.0.1" _container_id: str | None = field(default=None, init=False, repr=False)
[docs] def run_sync( self, *, target: str, model: str, file_root: str, inputs: dict[str, bytes] | None = None, timeout: int | None = None, ) -> RunResult: """Call ``at_runner.run_sync`` on ``target`` (or Docker localhost when configured).""" try: import at_runner # type: ignore except Exception as e: # pragma: no cover raise RuntimeError( "The `at_runner` package is not installed. Install the gRPC client, e.g.:\\n" + _AT_RUNNER_INSTALL_HINT ) from e effective_target = self._effective_target(target) r = at_runner.run_sync( effective_target, model=model, file_root=file_root, inputs=inputs, timeout=timeout, ) return RunResult( status=r.status, exit_code=r.exit_code, stdout=r.stdout, stderr=r.stderr, elapsed=r.elapsed, files=dict(r.files), )
def _effective_target(self, target: str) -> str: """Return gRPC address: pass-through in remote mode, else localhost + mapped port.""" if not self.use_docker: return target self._ensure_docker() return grpc_target_for_host_port(self.connect_host, self.host_port) def _ensure_docker(self) -> None: """Start the runner container once and wait until the gRPC port accepts connections.""" if self._container_id is not None: return cid = start_at_runner_container( image=self.docker_image, host_port=self.host_port, container_port=self.docker_grpc_port, ) self._container_id = cid try: wait_tcp(self.connect_host, self.host_port, timeout_s=60.0) except TimeoutError: stop_at_runner_container(cid) self._container_id = None raise
[docs] def close(self) -> None: """Stop the Docker container started for ``use_docker=True``, if any.""" if self._container_id is not None: stop_at_runner_container(self._container_id) self._container_id = None