Source code for at_py.readwrite.ati

"""Bellhop altimetry ``.ati`` reader (port of Matlab ``readati.m`` core file I/O)."""

from __future__ import annotations

from dataclasses import dataclass

import numpy as np


def _quoted_inner(line: str) -> str:
    """First single-quoted substring from an ``.ati`` header line."""
    parts = line.split("'")
    if len(parts) < 3:
        raise ValueError(f"expected quoted altimetry type on line: {line!r}")
    return parts[1]


def _pad_ati_type(inner: str) -> str:
    """Normalize type token to two characters (Matlab width)."""
    return (inner + "  ")[:2]


def _collect_floats_after(lines: list[str], start: int) -> list[float]:
    """Scan lines from ``start`` for comma/space-separated floats (skip ``!``, ``/``)."""
    out: list[float] = []
    for j in range(start, len(lines)):
        s = lines[j].strip()
        if not s:
            continue
        if "!" in s:
            s = s.split("!", 1)[0].strip()
        for tok in s.replace(",", " ").split():
            if tok == "/":
                continue
            try:
                out.append(float(tok))
            except ValueError:
                continue
    return out


[docs] @dataclass(frozen=True) class AltimetryRead: """2D sea-surface altimetry from an ``.ati`` file (ranges in **m** after km→m).""" interp_type: str range_m: np.ndarray depth_m: np.ndarray tangent_u: np.ndarray normal_u: np.ndarray segment_length_m: np.ndarray segment_curvature: np.ndarray tangent_node_u: np.ndarray | None normal_node_u: np.ndarray | None
[docs] def parse_ati( text: str, *, min_depth_m: float | None = None, ) -> AltimetryRead: """Parse a 2D Bellhop ``.ati`` file (Matlab ``readati``). Ranges in the file are **km**; returned ``range_m`` is in **m**. Endpoints use ±1e50 m as in Matlab. If ``min_depth_m`` is set (e.g. depth of the first tabulated SSP point), any altimetry **shallower** than that raises ``ValueError`` (Matlab: altimetry above first SSP). """ lines = text.splitlines() i = 0 while i < len(lines) and not lines[i].strip(): i += 1 if i >= len(lines): raise ValueError("empty .ati file") ati_head = _pad_ati_type(_quoted_inner(lines[i])) i += 1 interp = ati_head[0:1] if interp not in ("L", "C"): raise ValueError(f"unknown altimetry interpolation type: {interp!r}") while i < len(lines) and not lines[i].strip(): i += 1 if i >= len(lines): raise ValueError(".ati: missing point count") n_line = lines[i].strip() if "!" in n_line: n_line = n_line.split("!", 1)[0].strip() n_pts = int(float(n_line.split()[0].rstrip(","))) i += 1 floats = _collect_floats_after(lines, i) need = 2 * n_pts if len(floats) < need: raise ValueError(f".ati: expected {need} floats for {n_pts} points, got {len(floats)}") raw_r_km = np.asarray(floats[0::2][:n_pts], dtype=np.float64) raw_z = np.asarray(floats[1::2][:n_pts], dtype=np.float64) if min_depth_m is not None and np.any(raw_z < min_depth_m): raise ValueError("altimetry goes above first tabulated SSP value") r_m = np.empty(n_pts + 2, dtype=np.float64) z = np.empty(n_pts + 2, dtype=np.float64) r_m[0] = -1e50 r_m[1:-1] = raw_r_km * 1000.0 r_m[-1] = 1e50 z[0] = raw_z[0] z[1:-1] = raw_z z[-1] = raw_z[-1] n_ext = n_pts + 2 dr = np.diff(r_m) dz = np.diff(z) seg_len = np.sqrt(dr**2 + dz**2) tx = dr / seg_len tz = dz / seg_len t_top = np.vstack([tx, tz]) # readati.m: outward normal (water below interface) n_top = np.vstack([t_top[1, :], -t_top[0, :]]) n_seg = n_ext - 1 curv_seg = np.zeros(n_seg, dtype=np.float64) t_node: np.ndarray | None = None n_node: np.ndarray | None = None if interp == "C": t_node = np.zeros((2, n_ext), dtype=np.float64) n_node = np.zeros((2, n_ext), dtype=np.float64) for ii in range(1, n_ext - 1): t_node[:, ii] = 0.5 * (t_top[:, ii - 1] + t_top[:, ii]) t_node[:, 0] = [1.0, 0.0] t_node[:, -1] = [1.0, 0.0] n_node[0, :] = t_node[1, :] n_node[1, :] = -t_node[0, :] phi = np.arctan2(t_node[1, :], t_node[0, :]) curv_seg = np.diff(phi) / seg_len return AltimetryRead( interp_type=interp, range_m=r_m, depth_m=z, tangent_u=t_top, normal_u=n_top, segment_length_m=seg_len, segment_curvature=curv_seg, tangent_node_u=t_node, normal_node_u=n_node, )
[docs] def parse_ati_bytes( data: bytes, *, encoding: str = "utf-8", min_depth_m: float | None = None, ) -> AltimetryRead: """Like :func:`parse_ati` after decoding ``data`` with ``encoding``.""" return parse_ati(data.decode(encoding, errors="replace"), min_depth_m=min_depth_m)
def _fmt_ati_float(x: float) -> str: """Format a float for stable ``.ati`` round-trip.""" return f"{x:.17g}"
[docs] def format_ati( interp_type: str, range_km: np.ndarray, depth_m: np.ndarray, ) -> str: """Format a 2D Bellhop ``.ati`` file (inverse of :func:`parse_ati` for tabulated nodes). ``range_km`` and ``depth_m`` are internal file points only (km / meters), not the synthetic ±1e50 endpoints appended on read. """ ch = interp_type.strip().upper()[:1] if ch not in ("L", "C"): raise ValueError(f"interp_type must start with L or C; got {interp_type!r}") rk = np.asarray(range_km, dtype=np.float64).reshape(-1) zd = np.asarray(depth_m, dtype=np.float64).reshape(-1) if rk.size != zd.size or rk.size < 1: raise ValueError( "range_km and depth_m must be same-length 1D arrays with at least one point" ) lines: list[str] = [f"'{ch}'", str(int(rk.size))] for i in range(rk.size): lines.append(f" {_fmt_ati_float(float(rk[i]))} {_fmt_ati_float(float(zd[i]))}") return "\n".join(lines) + "\n"
[docs] def format_ati_bytes( interp_type: str, range_km: np.ndarray, depth_m: np.ndarray, *, encoding: str = "utf-8", ) -> bytes: """UTF-8 (or other) bytes for :func:`format_ati`.""" return format_ati(interp_type, range_km, depth_m).encode(encoding)
[docs] def format_ati_from_read(a: AltimetryRead) -> str: """Serialize altimetry using internal nodes from :class:`AltimetryRead` (round-trip helper).""" rk = a.range_m[1:-1] / 1000.0 zd = a.depth_m[1:-1] return format_ati(a.interp_type, rk, zd)