"""Bellhop / Bellhop3D ``.env`` tail parsing (Matlab ``read_env.m`` + ``read_bell.m``)."""
from __future__ import annotations
import math
from dataclasses import dataclass
from typing import Literal
import numpy as np
from at_py.readwrite.env import EnvCore, _parse_env_core_lines
from at_py.readwrite.vector import parse_readvector_lines
def _pad_run_type_five(quoted_inner: str) -> str:
"""Pad or truncate run-type to 5 characters (Matlab ``read_bell`` / Fortran width)."""
if len(quoted_inner) < 5:
return (quoted_inner + " ")[:5]
return quoted_inner[:5]
def _apply_read_bell_run_type_rules(rt: str) -> str:
"""Apply ``read_bell`` normalization to the 5-character run type."""
ch = list(rt.ljust(5)[:5])
if ch[0] == "a":
ch[0] = "A"
c = ch[1]
if c not in ("C", "R", "S", "B"):
ch[1] = "G"
return "".join(ch)
def _build_numeric_tokens(lines: list[str], start: int) -> list[str]:
"""Collect whitespace tokens from ``lines[start:]``, skipping ``!`` comments and ``/``."""
toks: list[str] = []
for j in range(start, len(lines)):
s = lines[j].strip()
if not s:
continue
if "!" in s:
s = s.split("!", 1)[0].strip()
if not s:
continue
for w in s.split():
if w == "/":
continue
toks.append(w)
return toks
class _TokenCursor:
"""Sequential tokens for ``fscanf``-like numeric reads after the run-type line."""
__slots__ = ("_t", "k")
def __init__(self, tokens: list[str]) -> None:
"""Token stream for Bellhop tail parsing (see :func:`_parse_read_bell`)."""
self._t = tokens
self.k = 0
def _peek(self) -> str | None:
"""Return next token without advancing, or ``None`` at EOF."""
return self._t[self.k] if self.k < len(self._t) else None
def exhausted(self) -> bool:
"""True when no tokens remain."""
return self.k >= len(self._t)
def trailing_preview(self, max_len: int = 120) -> str:
"""Debug helper: joined unread tokens, truncated to ``max_len`` chars."""
return " ".join(self._t[self.k :])[:max_len]
def next_raw(self) -> str:
"""Pop and return the next token string."""
if self.k >= len(self._t):
raise ValueError("unexpected end of token stream in read_bell")
t = self._t[self.k]
self.k += 1
return t
def next_float(self) -> float:
"""Pop next token and parse as ``float``."""
t = self.next_raw()
try:
return float(t)
except ValueError as e:
raise ValueError(f"expected float, got {t!r}") from e
def next_int(self) -> int:
"""Pop next token as float, then round to ``int`` (Matlab-style)."""
return int(round(self.next_float()))
def _extract_quoted_token(tok: str) -> str:
"""Return inner text of a single-quoted token (e.g. ``'A'`` → ``A``)."""
parts = tok.split("'")
if len(parts) < 3:
raise ValueError(f"expected quoted token, got {tok!r}")
return parts[1]
def _quoted_inner_line(line: str) -> str:
"""First quoted substring from a full env line."""
parts = line.split("'")
if len(parts) < 3:
raise ValueError(f"expected quoted string in line: {line!r}")
return parts[1]
def _read_angle_fan(tok: _TokenCursor, n: int) -> tuple[np.ndarray, int]:
"""``alpha`` / ``beta`` block: Matlab ``fscanf`` then ``linspace`` when ``N > 2``."""
if n == 1:
return np.array([tok.next_float()], dtype=np.float64), 1
a = tok.next_float()
b = tok.next_float()
if n > 2:
arr = np.linspace(a, b, n, dtype=np.float64)
else:
arr = np.array([a, b], dtype=np.float64)[:n]
n_out = int(arr.size)
# Full 360° duplicate-end removal (Matlab read_bell)
if n_out > 1 and abs((arr[-1] - arr[0]) % 360.0) < 10.0 * np.finfo(float).eps:
n_out -= 1
arr = arr[:-1]
return arr, n_out
def _parse_read_bell(
lines: list[str],
start: int,
*,
top_opt_7: str,
freq_hz: float,
depth_bot_m: float,
depth_top_m: float,
rmax_km: float,
model: Literal["BELLHOP", "BELLHOP3D"],
) -> tuple[BellhopBeamParams, int]:
"""Parse ``read_bell`` starting at ``lines[start]`` (run-type line). Returns next line index."""
if start >= len(lines):
raise ValueError("read_bell: missing run-type line")
run_line = lines[start]
inner = _quoted_inner_line(run_line)
rt = _apply_read_bell_run_type_rules(_pad_run_type_five(inner))
toks = _build_numeric_tokens(lines, start + 1)
cur = _TokenCursor(toks)
nbeams = cur.next_int()
if nbeams == 0:
nbeams = max(math.ceil(0.3 * 1000.0 * rmax_km * freq_hz / 1500.0), 300)
ibeam: int | None = None
if len(top_opt_7) > 5 and top_opt_7[5:6] == "I":
ibeam = cur.next_int()
alpha, nbeams_eff = _read_angle_fan(cur, nbeams)
nbeta: int | None = None
beta_deg: np.ndarray | None = None
if model == "BELLHOP3D":
nbeta_in = cur.next_int()
beta_deg, nbeta = _read_angle_fan(cur, nbeta_in)
else:
nbeta = None
beta_deg = None
deltas = cur.next_float()
if model == "BELLHOP":
box_z = cur.next_float()
box_r = cur.next_float()
box_x = box_y = None
if deltas == 0.0:
deltas = (depth_bot_m - depth_top_m) / 10.0
else:
box_x = cur.next_float()
box_y = cur.next_float()
box_z = cur.next_float()
box_r = None
if deltas == 0.0:
deltas = (depth_bot_m - depth_top_m) / 10.0
rt2 = rt[1:2]
beam_type: str | None = None
epmult: float | None = None
r_loop: float | None = None
n_image: int | None = None
ib_win: int | None = None
if rt2 not in ("G", "B", "S"):
raw_ty = cur.next_raw()
beam_type = _pad_run_type_five(_extract_quoted_token(raw_ty))
epmult = cur.next_float()
r_loop = cur.next_float()
n_image = cur.next_int()
ib_win = cur.next_int()
if len(rt) < 4:
rt = (rt + " ")[:4]
rt_list = list(rt.ljust(5)[:5])
if rt_list[3] not in ("R", "X"):
rt_list[3] = "R"
rt = "".join(rt_list)
if not cur.exhausted():
raise ValueError(f"read_bell: unexpected trailing tokens: {cur.trailing_preview()!r}")
params = BellhopBeamParams(
run_type=rt,
nbeams=nbeams_eff,
ibeam=ibeam,
alpha_deg=alpha,
nbeta=nbeta,
beta_deg=beta_deg,
deltas_m=deltas,
box_z_m=box_z,
box_r_km=box_r,
box_x_km=box_x,
box_y_km=box_y,
beam_type_ms=beam_type,
epmult=epmult,
r_loop=r_loop,
n_image=n_image,
ib_win=ib_win,
)
return params, len(lines)
[docs]
@dataclass(frozen=True)
class BellhopBeamParams:
"""Bellhop control block from ``read_bell.m`` (partial — omits unused globals)."""
run_type: str
nbeams: int
ibeam: int | None
alpha_deg: np.ndarray
nbeta: int | None
beta_deg: np.ndarray | None
deltas_m: float
box_z_m: float
box_r_km: float | None
box_x_km: float | None
box_y_km: float | None
beam_type_ms: str | None
epmult: float | None
r_loop: float | None
n_image: int | None
ib_win: int | None
[docs]
@dataclass(frozen=True)
class BellhopEnvTail:
"""Post-core Bellhop 2D: ``readszrz`` + ``readr`` + ``read_bell``."""
source_z_m: np.ndarray
receiver_z_m: np.ndarray
receiver_range_km: np.ndarray
beam: BellhopBeamParams
[docs]
@dataclass(frozen=True)
class Bellhop3DEnvTail:
"""Bellhop3D tail: ``readsxsy``, ``readszrz``, ``readr``, ``readtheta``, ``read_bell``."""
source_x_m: np.ndarray
source_y_m: np.ndarray
source_z_m: np.ndarray
receiver_z_m: np.ndarray
receiver_range_km: np.ndarray
receiver_theta_deg: np.ndarray
beam: BellhopBeamParams
[docs]
@dataclass(frozen=True)
class ParsedEnvBellhop:
"""Full 2D Bellhop ``.env``: SSP/core plus Bellhop tail (sources, receivers, beam block)."""
core: EnvCore
tail: BellhopEnvTail
[docs]
@dataclass(frozen=True)
class ParsedEnvBellhop3D:
"""Full Bellhop3D ``.env``: core plus 3D source/receiver grids and beam block."""
core: EnvCore
tail: Bellhop3DEnvTail
[docs]
def parse_env_bellhop(text: str) -> ParsedEnvBellhop:
"""Parse a 2D Bellhop ``.env`` (Matlab ``read_env`` with ``model='BELLHOP'``)."""
lines = text.splitlines()
core, i = _parse_env_core_lines(lines, 0)
top_opt = (core.bdry.top_opt + " ")[:7]
sd, _, i = parse_readvector_lines(lines, i)
rd, _, i = parse_readvector_lines(lines, i)
rr, _, i = parse_readvector_lines(lines, i)
rmax_km = float(rr[-1])
depth_top = float(core.bdry.top.depth) if core.bdry.top.depth is not None else 0.0
depth_bot = float(core.bdry.bot.depth) if core.bdry.bot.depth is not None else 0.0
beam, _end = _parse_read_bell(
lines,
i,
top_opt_7=top_opt,
freq_hz=core.freq_hz,
depth_bot_m=depth_bot,
depth_top_m=depth_top,
rmax_km=rmax_km,
model="BELLHOP",
)
tail = BellhopEnvTail(
source_z_m=sd,
receiver_z_m=rd,
receiver_range_km=rr,
beam=beam,
)
return ParsedEnvBellhop(core=core, tail=tail)
[docs]
def parse_env_bellhop3d(text: str) -> ParsedEnvBellhop3D:
"""Parse a Bellhop3D ``.env`` (Matlab ``read_env`` with ``model='BELLHOP3D'``)."""
lines = text.splitlines()
core, i = _parse_env_core_lines(lines, 0)
top_opt = (core.bdry.top_opt + " ")[:7]
sx_km, _, i = parse_readvector_lines(lines, i)
sy_km, _, i = parse_readvector_lines(lines, i)
sx_m = sx_km * 1000.0
sy_m = sy_km * 1000.0
sd, _, i = parse_readvector_lines(lines, i)
rd, _, i = parse_readvector_lines(lines, i)
rr, _, i = parse_readvector_lines(lines, i)
theta, _, i = parse_readvector_lines(lines, i)
rmax_km = float(rr[-1])
depth_top = float(core.bdry.top.depth) if core.bdry.top.depth is not None else 0.0
depth_bot = float(core.bdry.bot.depth) if core.bdry.bot.depth is not None else 0.0
beam, _end = _parse_read_bell(
lines,
i,
top_opt_7=top_opt,
freq_hz=core.freq_hz,
depth_bot_m=depth_bot,
depth_top_m=depth_top,
rmax_km=rmax_km,
model="BELLHOP3D",
)
tail = Bellhop3DEnvTail(
source_x_m=sx_m,
source_y_m=sy_m,
source_z_m=sd,
receiver_z_m=rd,
receiver_range_km=rr,
receiver_theta_deg=theta,
beam=beam,
)
return ParsedEnvBellhop3D(core=core, tail=tail)
[docs]
def parse_env_bellhop_bytes(data: bytes, *, encoding: str = "utf-8") -> ParsedEnvBellhop:
"""Like :func:`parse_env_bellhop` after decoding ``data`` with ``encoding``."""
return parse_env_bellhop(data.decode(encoding, errors="replace"))
[docs]
def parse_env_bellhop3d_bytes(data: bytes, *, encoding: str = "utf-8") -> ParsedEnvBellhop3D:
"""Like :func:`parse_env_bellhop3d` after decoding ``data`` with ``encoding``."""
return parse_env_bellhop3d(data.decode(encoding, errors="replace"))