Source code for at_py.readwrite.env_bellhop

"""Bellhop / Bellhop3D ``.env`` tail parsing (Matlab ``read_env.m`` + ``read_bell.m``)."""

from __future__ import annotations

import math
from dataclasses import dataclass
from typing import Literal

import numpy as np

from at_py.readwrite.env import EnvCore, _parse_env_core_lines
from at_py.readwrite.vector import parse_readvector_lines


def _pad_run_type_five(quoted_inner: str) -> str:
    """Pad or truncate run-type to 5 characters (Matlab ``read_bell`` / Fortran width)."""
    if len(quoted_inner) < 5:
        return (quoted_inner + "     ")[:5]
    return quoted_inner[:5]


def _apply_read_bell_run_type_rules(rt: str) -> str:
    """Apply ``read_bell`` normalization to the 5-character run type."""
    ch = list(rt.ljust(5)[:5])
    if ch[0] == "a":
        ch[0] = "A"
    c = ch[1]
    if c not in ("C", "R", "S", "B"):
        ch[1] = "G"
    return "".join(ch)


def _build_numeric_tokens(lines: list[str], start: int) -> list[str]:
    """Collect whitespace tokens from ``lines[start:]``, skipping ``!`` comments and ``/``."""
    toks: list[str] = []
    for j in range(start, len(lines)):
        s = lines[j].strip()
        if not s:
            continue
        if "!" in s:
            s = s.split("!", 1)[0].strip()
        if not s:
            continue
        for w in s.split():
            if w == "/":
                continue
            toks.append(w)
    return toks


class _TokenCursor:
    """Sequential tokens for ``fscanf``-like numeric reads after the run-type line."""

    __slots__ = ("_t", "k")

    def __init__(self, tokens: list[str]) -> None:
        """Token stream for Bellhop tail parsing (see :func:`_parse_read_bell`)."""
        self._t = tokens
        self.k = 0

    def _peek(self) -> str | None:
        """Return next token without advancing, or ``None`` at EOF."""
        return self._t[self.k] if self.k < len(self._t) else None

    def exhausted(self) -> bool:
        """True when no tokens remain."""
        return self.k >= len(self._t)

    def trailing_preview(self, max_len: int = 120) -> str:
        """Debug helper: joined unread tokens, truncated to ``max_len`` chars."""
        return " ".join(self._t[self.k :])[:max_len]

    def next_raw(self) -> str:
        """Pop and return the next token string."""
        if self.k >= len(self._t):
            raise ValueError("unexpected end of token stream in read_bell")
        t = self._t[self.k]
        self.k += 1
        return t

    def next_float(self) -> float:
        """Pop next token and parse as ``float``."""
        t = self.next_raw()
        try:
            return float(t)
        except ValueError as e:
            raise ValueError(f"expected float, got {t!r}") from e

    def next_int(self) -> int:
        """Pop next token as float, then round to ``int`` (Matlab-style)."""
        return int(round(self.next_float()))


def _extract_quoted_token(tok: str) -> str:
    """Return inner text of a single-quoted token (e.g. ``'A'`` → ``A``)."""
    parts = tok.split("'")
    if len(parts) < 3:
        raise ValueError(f"expected quoted token, got {tok!r}")
    return parts[1]


def _quoted_inner_line(line: str) -> str:
    """First quoted substring from a full env line."""
    parts = line.split("'")
    if len(parts) < 3:
        raise ValueError(f"expected quoted string in line: {line!r}")
    return parts[1]


def _read_angle_fan(tok: _TokenCursor, n: int) -> tuple[np.ndarray, int]:
    """``alpha`` / ``beta`` block: Matlab ``fscanf`` then ``linspace`` when ``N > 2``."""
    if n == 1:
        return np.array([tok.next_float()], dtype=np.float64), 1
    a = tok.next_float()
    b = tok.next_float()
    if n > 2:
        arr = np.linspace(a, b, n, dtype=np.float64)
    else:
        arr = np.array([a, b], dtype=np.float64)[:n]
    n_out = int(arr.size)
    # Full 360° duplicate-end removal (Matlab read_bell)
    if n_out > 1 and abs((arr[-1] - arr[0]) % 360.0) < 10.0 * np.finfo(float).eps:
        n_out -= 1
        arr = arr[:-1]
    return arr, n_out


def _parse_read_bell(
    lines: list[str],
    start: int,
    *,
    top_opt_7: str,
    freq_hz: float,
    depth_bot_m: float,
    depth_top_m: float,
    rmax_km: float,
    model: Literal["BELLHOP", "BELLHOP3D"],
) -> tuple[BellhopBeamParams, int]:
    """Parse ``read_bell`` starting at ``lines[start]`` (run-type line). Returns next line index."""
    if start >= len(lines):
        raise ValueError("read_bell: missing run-type line")

    run_line = lines[start]
    inner = _quoted_inner_line(run_line)
    rt = _apply_read_bell_run_type_rules(_pad_run_type_five(inner))

    toks = _build_numeric_tokens(lines, start + 1)
    cur = _TokenCursor(toks)

    nbeams = cur.next_int()
    if nbeams == 0:
        nbeams = max(math.ceil(0.3 * 1000.0 * rmax_km * freq_hz / 1500.0), 300)

    ibeam: int | None = None
    if len(top_opt_7) > 5 and top_opt_7[5:6] == "I":
        ibeam = cur.next_int()

    alpha, nbeams_eff = _read_angle_fan(cur, nbeams)

    nbeta: int | None = None
    beta_deg: np.ndarray | None = None
    if model == "BELLHOP3D":
        nbeta_in = cur.next_int()
        beta_deg, nbeta = _read_angle_fan(cur, nbeta_in)
    else:
        nbeta = None
        beta_deg = None

    deltas = cur.next_float()
    if model == "BELLHOP":
        box_z = cur.next_float()
        box_r = cur.next_float()
        box_x = box_y = None
        if deltas == 0.0:
            deltas = (depth_bot_m - depth_top_m) / 10.0
    else:
        box_x = cur.next_float()
        box_y = cur.next_float()
        box_z = cur.next_float()
        box_r = None
        if deltas == 0.0:
            deltas = (depth_bot_m - depth_top_m) / 10.0

    rt2 = rt[1:2]
    beam_type: str | None = None
    epmult: float | None = None
    r_loop: float | None = None
    n_image: int | None = None
    ib_win: int | None = None

    if rt2 not in ("G", "B", "S"):
        raw_ty = cur.next_raw()
        beam_type = _pad_run_type_five(_extract_quoted_token(raw_ty))
        epmult = cur.next_float()
        r_loop = cur.next_float()
        n_image = cur.next_int()
        ib_win = cur.next_int()

    if len(rt) < 4:
        rt = (rt + "    ")[:4]
    rt_list = list(rt.ljust(5)[:5])
    if rt_list[3] not in ("R", "X"):
        rt_list[3] = "R"
    rt = "".join(rt_list)

    if not cur.exhausted():
        raise ValueError(f"read_bell: unexpected trailing tokens: {cur.trailing_preview()!r}")

    params = BellhopBeamParams(
        run_type=rt,
        nbeams=nbeams_eff,
        ibeam=ibeam,
        alpha_deg=alpha,
        nbeta=nbeta,
        beta_deg=beta_deg,
        deltas_m=deltas,
        box_z_m=box_z,
        box_r_km=box_r,
        box_x_km=box_x,
        box_y_km=box_y,
        beam_type_ms=beam_type,
        epmult=epmult,
        r_loop=r_loop,
        n_image=n_image,
        ib_win=ib_win,
    )
    return params, len(lines)


[docs] @dataclass(frozen=True) class BellhopBeamParams: """Bellhop control block from ``read_bell.m`` (partial — omits unused globals).""" run_type: str nbeams: int ibeam: int | None alpha_deg: np.ndarray nbeta: int | None beta_deg: np.ndarray | None deltas_m: float box_z_m: float box_r_km: float | None box_x_km: float | None box_y_km: float | None beam_type_ms: str | None epmult: float | None r_loop: float | None n_image: int | None ib_win: int | None
[docs] @dataclass(frozen=True) class BellhopEnvTail: """Post-core Bellhop 2D: ``readszrz`` + ``readr`` + ``read_bell``.""" source_z_m: np.ndarray receiver_z_m: np.ndarray receiver_range_km: np.ndarray beam: BellhopBeamParams
[docs] @dataclass(frozen=True) class Bellhop3DEnvTail: """Bellhop3D tail: ``readsxsy``, ``readszrz``, ``readr``, ``readtheta``, ``read_bell``.""" source_x_m: np.ndarray source_y_m: np.ndarray source_z_m: np.ndarray receiver_z_m: np.ndarray receiver_range_km: np.ndarray receiver_theta_deg: np.ndarray beam: BellhopBeamParams
[docs] @dataclass(frozen=True) class ParsedEnvBellhop: """Full 2D Bellhop ``.env``: SSP/core plus Bellhop tail (sources, receivers, beam block).""" core: EnvCore tail: BellhopEnvTail
[docs] @dataclass(frozen=True) class ParsedEnvBellhop3D: """Full Bellhop3D ``.env``: core plus 3D source/receiver grids and beam block.""" core: EnvCore tail: Bellhop3DEnvTail
[docs] def parse_env_bellhop(text: str) -> ParsedEnvBellhop: """Parse a 2D Bellhop ``.env`` (Matlab ``read_env`` with ``model='BELLHOP'``).""" lines = text.splitlines() core, i = _parse_env_core_lines(lines, 0) top_opt = (core.bdry.top_opt + " ")[:7] sd, _, i = parse_readvector_lines(lines, i) rd, _, i = parse_readvector_lines(lines, i) rr, _, i = parse_readvector_lines(lines, i) rmax_km = float(rr[-1]) depth_top = float(core.bdry.top.depth) if core.bdry.top.depth is not None else 0.0 depth_bot = float(core.bdry.bot.depth) if core.bdry.bot.depth is not None else 0.0 beam, _end = _parse_read_bell( lines, i, top_opt_7=top_opt, freq_hz=core.freq_hz, depth_bot_m=depth_bot, depth_top_m=depth_top, rmax_km=rmax_km, model="BELLHOP", ) tail = BellhopEnvTail( source_z_m=sd, receiver_z_m=rd, receiver_range_km=rr, beam=beam, ) return ParsedEnvBellhop(core=core, tail=tail)
[docs] def parse_env_bellhop3d(text: str) -> ParsedEnvBellhop3D: """Parse a Bellhop3D ``.env`` (Matlab ``read_env`` with ``model='BELLHOP3D'``).""" lines = text.splitlines() core, i = _parse_env_core_lines(lines, 0) top_opt = (core.bdry.top_opt + " ")[:7] sx_km, _, i = parse_readvector_lines(lines, i) sy_km, _, i = parse_readvector_lines(lines, i) sx_m = sx_km * 1000.0 sy_m = sy_km * 1000.0 sd, _, i = parse_readvector_lines(lines, i) rd, _, i = parse_readvector_lines(lines, i) rr, _, i = parse_readvector_lines(lines, i) theta, _, i = parse_readvector_lines(lines, i) rmax_km = float(rr[-1]) depth_top = float(core.bdry.top.depth) if core.bdry.top.depth is not None else 0.0 depth_bot = float(core.bdry.bot.depth) if core.bdry.bot.depth is not None else 0.0 beam, _end = _parse_read_bell( lines, i, top_opt_7=top_opt, freq_hz=core.freq_hz, depth_bot_m=depth_bot, depth_top_m=depth_top, rmax_km=rmax_km, model="BELLHOP3D", ) tail = Bellhop3DEnvTail( source_x_m=sx_m, source_y_m=sy_m, source_z_m=sd, receiver_z_m=rd, receiver_range_km=rr, receiver_theta_deg=theta, beam=beam, ) return ParsedEnvBellhop3D(core=core, tail=tail)
[docs] def parse_env_bellhop_bytes(data: bytes, *, encoding: str = "utf-8") -> ParsedEnvBellhop: """Like :func:`parse_env_bellhop` after decoding ``data`` with ``encoding``.""" return parse_env_bellhop(data.decode(encoding, errors="replace"))
[docs] def parse_env_bellhop3d_bytes(data: bytes, *, encoding: str = "utf-8") -> ParsedEnvBellhop3D: """Like :func:`parse_env_bellhop3d` after decoding ``data`` with ``encoding``.""" return parse_env_bellhop3d(data.decode(encoding, errors="replace"))