"""Bellhop altimetry ``.ati`` reader (port of Matlab ``readati.m`` core file I/O)."""
from __future__ import annotations
from dataclasses import dataclass
import numpy as np
def _quoted_inner(line: str) -> str:
"""First single-quoted substring from an ``.ati`` header line."""
parts = line.split("'")
if len(parts) < 3:
raise ValueError(f"expected quoted altimetry type on line: {line!r}")
return parts[1]
def _pad_ati_type(inner: str) -> str:
"""Normalize type token to two characters (Matlab width)."""
return (inner + " ")[:2]
def _collect_floats_after(lines: list[str], start: int) -> list[float]:
"""Scan lines from ``start`` for comma/space-separated floats (skip ``!``, ``/``)."""
out: list[float] = []
for j in range(start, len(lines)):
s = lines[j].strip()
if not s:
continue
if "!" in s:
s = s.split("!", 1)[0].strip()
for tok in s.replace(",", " ").split():
if tok == "/":
continue
try:
out.append(float(tok))
except ValueError:
continue
return out
[docs]
@dataclass(frozen=True)
class AltimetryRead:
"""2D sea-surface altimetry from an ``.ati`` file (ranges in **m** after km→m)."""
interp_type: str
range_m: np.ndarray
depth_m: np.ndarray
tangent_u: np.ndarray
normal_u: np.ndarray
segment_length_m: np.ndarray
segment_curvature: np.ndarray
tangent_node_u: np.ndarray | None
normal_node_u: np.ndarray | None
[docs]
def parse_ati(
text: str,
*,
min_depth_m: float | None = None,
) -> AltimetryRead:
"""Parse a 2D Bellhop ``.ati`` file (Matlab ``readati``).
Ranges in the file are **km**; returned ``range_m`` is in **m**. Endpoints use ±1e50 m
as in Matlab.
If ``min_depth_m`` is set (e.g. depth of the first tabulated SSP point), any altimetry
**shallower** than that raises ``ValueError`` (Matlab: altimetry above first SSP).
"""
lines = text.splitlines()
i = 0
while i < len(lines) and not lines[i].strip():
i += 1
if i >= len(lines):
raise ValueError("empty .ati file")
ati_head = _pad_ati_type(_quoted_inner(lines[i]))
i += 1
interp = ati_head[0:1]
if interp not in ("L", "C"):
raise ValueError(f"unknown altimetry interpolation type: {interp!r}")
while i < len(lines) and not lines[i].strip():
i += 1
if i >= len(lines):
raise ValueError(".ati: missing point count")
n_line = lines[i].strip()
if "!" in n_line:
n_line = n_line.split("!", 1)[0].strip()
n_pts = int(float(n_line.split()[0].rstrip(",")))
i += 1
floats = _collect_floats_after(lines, i)
need = 2 * n_pts
if len(floats) < need:
raise ValueError(f".ati: expected {need} floats for {n_pts} points, got {len(floats)}")
raw_r_km = np.asarray(floats[0::2][:n_pts], dtype=np.float64)
raw_z = np.asarray(floats[1::2][:n_pts], dtype=np.float64)
if min_depth_m is not None and np.any(raw_z < min_depth_m):
raise ValueError("altimetry goes above first tabulated SSP value")
r_m = np.empty(n_pts + 2, dtype=np.float64)
z = np.empty(n_pts + 2, dtype=np.float64)
r_m[0] = -1e50
r_m[1:-1] = raw_r_km * 1000.0
r_m[-1] = 1e50
z[0] = raw_z[0]
z[1:-1] = raw_z
z[-1] = raw_z[-1]
n_ext = n_pts + 2
dr = np.diff(r_m)
dz = np.diff(z)
seg_len = np.sqrt(dr**2 + dz**2)
tx = dr / seg_len
tz = dz / seg_len
t_top = np.vstack([tx, tz])
# readati.m: outward normal (water below interface)
n_top = np.vstack([t_top[1, :], -t_top[0, :]])
n_seg = n_ext - 1
curv_seg = np.zeros(n_seg, dtype=np.float64)
t_node: np.ndarray | None = None
n_node: np.ndarray | None = None
if interp == "C":
t_node = np.zeros((2, n_ext), dtype=np.float64)
n_node = np.zeros((2, n_ext), dtype=np.float64)
for ii in range(1, n_ext - 1):
t_node[:, ii] = 0.5 * (t_top[:, ii - 1] + t_top[:, ii])
t_node[:, 0] = [1.0, 0.0]
t_node[:, -1] = [1.0, 0.0]
n_node[0, :] = t_node[1, :]
n_node[1, :] = -t_node[0, :]
phi = np.arctan2(t_node[1, :], t_node[0, :])
curv_seg = np.diff(phi) / seg_len
return AltimetryRead(
interp_type=interp,
range_m=r_m,
depth_m=z,
tangent_u=t_top,
normal_u=n_top,
segment_length_m=seg_len,
segment_curvature=curv_seg,
tangent_node_u=t_node,
normal_node_u=n_node,
)
[docs]
def parse_ati_bytes(
data: bytes,
*,
encoding: str = "utf-8",
min_depth_m: float | None = None,
) -> AltimetryRead:
"""Like :func:`parse_ati` after decoding ``data`` with ``encoding``."""
return parse_ati(data.decode(encoding, errors="replace"), min_depth_m=min_depth_m)
def _fmt_ati_float(x: float) -> str:
"""Format a float for stable ``.ati`` round-trip."""
return f"{x:.17g}"