Source code for at_py.readwrite.env

from __future__ import annotations

from dataclasses import dataclass

import numpy as np

from at_py.readwrite.attenuation import FrancoisGarrisonParams, crci
from at_py.readwrite.vector import parse_readvector_lines


class _LineIter:
    """Forward-only line iterator with 1-based-style `next` for env parsing."""

    __slots__ = ("_lines", "i")

    def __init__(self, lines: list[str], start: int = 0) -> None:
        """Start iteration at line index ``start``."""
        self._lines = lines
        self.i = start

    def next(self) -> str:
        """Return the next non-consumed line and advance (raises if EOF)."""
        if self.i >= len(self._lines):
            raise ValueError("unexpected end of env file")
        s = self._lines[self.i]
        self.i += 1
        return s


def _extract_quoted(s: str) -> str:
    """Return the first single-quoted substring in ``s`` (Fortran env lines)."""
    parts = s.split("'")
    if len(parts) < 3:
        raise ValueError(f"expected quoted string in line: {s!r}")
    return parts[1]


@dataclass(frozen=True)
class HalfspaceRaw:
    """Raw P/S wave speeds and density coefficients for an acoustic half-space line."""

    alphaR: float
    alphaI: float
    betaR: float
    betaI: float
    rho: float


@dataclass(frozen=True)
class Halfspace:
    """Acoustic half-space boundary: boundary code, complex speeds, density, optional raw line."""

    bc: str
    cp: complex
    cs: complex
    rho: float
    depth: float | None = None
    raw: HalfspaceRaw | None = None


@dataclass(frozen=True)
class SSPRawLayer:
    """Per-medium raw SSP mesh: depth and viscoelastic parameters at each point."""

    z: np.ndarray
    alphaR: np.ndarray
    alphaI: np.ndarray
    betaR: np.ndarray
    betaI: np.ndarray
    rho: np.ndarray


@dataclass(frozen=True)
class SSP:
    """Stacked sound-speed profile across media (grids, densities, optional Francois–Garrison)."""

    n_media: int
    n_mesh: np.ndarray
    sigma: np.ndarray
    depth: np.ndarray
    z: np.ndarray
    c: np.ndarray
    cs: np.ndarray
    rho: np.ndarray
    npts: np.ndarray
    raw: list[SSPRawLayer]
    cz: np.ndarray | None = None
    fg: FrancoisGarrisonParams | None = None


@dataclass(frozen=True)
class Bdry:
    """Top/bottom boundary options, SSP type, attenuation unit, and half-space structs."""

    top_opt: str
    bot_opt: str
    ssp_type: str
    atten_unit: str
    top: Halfspace
    bot: Halfspace
    top_rho_inside: float
    top_c_inside: complex
    bot_rho_inside: float
    bot_c_inside: complex


[docs] @dataclass(frozen=True) class EnvCore: """Common ``.env`` header: title, frequency, and full SSP plus boundary section.""" title: str freq_hz: float ssp: SSP bdry: Bdry
[docs] @dataclass(frozen=True) class KrakenEnvTail: """Options after ``read_env_core`` for KRAKEN/SCOOTER/SPARC/BOUNCE (Matlab ``read_env.m``).""" c_low: float c_high: float r_max_km: float source_z: np.ndarray receiver_z: np.ndarray
[docs] @dataclass(frozen=True) class ParsedEnvKraken: """Full ``.env`` parse for the KRAKEN-family tail section.""" core: EnvCore tail: KrakenEnvTail
def _split_env_numeric_tokens(line: str) -> list[str]: """Split a Fortran env line, dropping ``!`` comments and a trailing ``/``.""" s = line.strip() if "!" in s: s = s.split("!", 1)[0].strip() parts: list[str] = [] for tok in s.split(): if tok == "/": break parts.append(tok) return parts def _floats_from_line_prefix(line: str, count: int) -> list[float]: """Parse leading floats on a line, stopping at ``!`` (Fortran-style comment).""" out: list[float] = [] for tok in line.split(): if tok.startswith("!"): break try: out.append(float(tok)) except ValueError: continue if len(out) >= count: break if len(out) < count: raise ValueError(f"expected at least {count} float(s) on line: {line!r}") return out[:count] def _parse_env_core_lines(lines: list[str], start: int = 0) -> tuple[EnvCore, int]: """Parse env core; return ``(EnvCore, next_line_index)`` (Matlab `read_env_core.m`).""" reader = _LineIter(lines, start) def next_line() -> str: """Forward to :meth:`_LineIter.next` (local alias for readability).""" return reader.next() # Defaults (Matlab globals) alphaR = 1500.0 betaR = 0.0 rhoR = 1.0 alphaI = 0.0 betaI = 0.0 title = _extract_quoted(next_line()) freq_hz = float(next_line().split()[0]) n_media = int(next_line().split()[0]) top_opt_line = next_line() top_opt = (_extract_quoted(top_opt_line) + " ")[:7] # deprecated '*' → '~' at position 5 (1-based) if len(top_opt) >= 5 and top_opt[4:5] == "*": top_opt = top_opt[:4] + "~" + top_opt[5:] ssp_type = top_opt[0:1] top_bc = top_opt[1:2] atten_unit = top_opt[2:4] fg: FrancoisGarrisonParams | None = None if len(top_opt) >= 4 and top_opt[3:4] == "F": t, s, ph, zb = [float(x) for x in next_line().split()[:4]] fg = FrancoisGarrisonParams(T=t, S=s, pH=ph, z_bar=zb) def read_halfspace(bc: str) -> Halfspace: """Parse one acoustic half-space line when ``bc == 'A'``.""" nonlocal alphaR, betaR, rhoR, alphaI, betaI if bc != "A": return Halfspace(bc=bc, cp=0.0 + 0.0j, cs=0.0 + 0.0j, rho=0.0) vals = [float(x) for x in _split_env_numeric_tokens(next_line())] # ztmp present but not stored for halfspaces in Matlab (used for CRCI depth in Fortran) ztmp_hs = float(vals[0]) if len(vals) >= 1 else 0.0 if len(vals) >= 2: alphaR = vals[1] if len(vals) >= 3: betaR = vals[2] if len(vals) >= 4: rhoR = vals[3] if len(vals) >= 5: alphaI = vals[4] if len(vals) >= 6: betaI = vals[5] cp = crci(alphaR, alphaI, freq_hz=freq_hz, atten_unit=atten_unit, fg=fg, depth_m=ztmp_hs) cs = crci(betaR, betaI, freq_hz=freq_hz, atten_unit=atten_unit, fg=fg, depth_m=ztmp_hs) return Halfspace( bc=bc, cp=cp, cs=cs, rho=rhoR, raw=HalfspaceRaw(alphaR=alphaR, alphaI=alphaI, betaR=betaR, betaI=betaI, rho=rhoR), ) top_hs = read_halfspace(top_bc) # Read SSP layers n_mesh = np.zeros(n_media, dtype=np.int32) sigma = np.zeros(n_media, dtype=np.float64) depth = np.zeros(n_media + 1, dtype=np.float64) z_all: list[float] = [] c_all: list[complex] = [] cs_all: list[complex] = [] rho_all: list[float] = [] npts = np.zeros(n_media, dtype=np.int32) raw_layers: list[SSPRawLayer] = [] loc = np.zeros(n_media, dtype=np.int32) n_first_acoustic = 0 n_last_acoustic = 0 for medium in range(n_media): if medium == 0: loc[medium] = 0 else: loc[medium] = int(loc[medium - 1] + npts[medium - 1]) parts = next_line().split() n_mesh[medium] = int(parts[0]) sigma[medium] = float(parts[1]) depth[medium + 1] = float(parts[2]) layer_z: list[float] = [] layer_alphaR: list[float] = [] layer_alphaI: list[float] = [] layer_betaR: list[float] = [] layer_betaI: list[float] = [] layer_rho: list[float] = [] has_shear = False target = depth[medium + 1] for _ in range(10_000_000): row = _split_env_numeric_tokens(next_line()) if not row: continue ztmp = float(row[0]) if len(row) >= 2: alphaR = float(row[1]) if len(row) >= 3: betaR = float(row[2]) if len(row) >= 4: rhoR = float(row[3]) if len(row) >= 5: alphaI = float(row[4]) if len(row) >= 6: betaI = float(row[5]) cp = crci(alphaR, alphaI, freq_hz=freq_hz, atten_unit=atten_unit, fg=fg, depth_m=ztmp) css = crci(betaR, betaI, freq_hz=freq_hz, atten_unit=atten_unit, fg=fg, depth_m=ztmp) z_all.append(ztmp) c_all.append(cp) cs_all.append(css) rho_all.append(rhoR) layer_z.append(ztmp) layer_alphaR.append(alphaR) layer_alphaI.append(alphaI) layer_betaR.append(betaR) layer_betaI.append(betaI) layer_rho.append(rhoR) if betaR != 0.0: has_shear = True if abs(ztmp - target) <= 1e-7: if medium == 0: depth[0] = z_all[0] break else: raise ValueError("SSP layer parse exceeded iteration limit") npts[medium] = len(layer_z) raw_layers.append( SSPRawLayer( z=np.asarray(layer_z, dtype=np.float64), alphaR=np.asarray(layer_alphaR, dtype=np.float64), alphaI=np.asarray(layer_alphaI, dtype=np.float64), betaR=np.asarray(layer_betaR, dtype=np.float64), betaI=np.asarray(layer_betaI, dtype=np.float64), rho=np.asarray(layer_rho, dtype=np.float64), ) ) if n_mesh[medium] == 0: C = betaR if betaR > 0.0 else alphaR deltaz = 0.05 * C / freq_hz n_mesh[medium] = int(round((depth[medium + 1] - depth[medium]) / deltaz)) n_mesh[medium] = max(int(n_mesh[medium]), 10) if not has_shear: if n_first_acoustic == 0: n_first_acoustic = medium + 1 # 1-based like Matlab n_last_acoustic = medium + 1 if medium == 0 and len(z_all) >= 2: hv = np.diff(np.asarray(z_all, dtype=np.float64)) cz = np.diff(np.asarray(c_all, dtype=np.complex128)) / hv else: cz = None bot_opt_line = next_line() bot_opt = (_extract_quoted(bot_opt_line) + " ")[:3] if len(bot_opt) >= 2 and bot_opt[1:2] == "*": bot_opt = bot_opt[:1] + "~" + bot_opt[2:] bot_bc = bot_opt[0:1] bot_hs = read_halfspace(bot_bc) # Boundary depths top_hs = Halfspace(**{**top_hs.__dict__, "depth": float(depth[0])}) bot_hs = Halfspace(**{**bot_hs.__dict__, "depth": float(depth[-1])}) # rho/c just inside boundaries if n_first_acoustic == 0 or n_last_acoustic == 0: raise ValueError("no acoustic medium found") top_inside_idx = n_first_acoustic - 1 bot_inside_idx = int(loc[n_last_acoustic - 1] + npts[n_last_acoustic - 1] - 1) z_arr = np.asarray(z_all, dtype=np.float64) c_arr = np.asarray(c_all, dtype=np.complex128) cs_arr = np.asarray(cs_all, dtype=np.complex128) rho_arr = np.asarray(rho_all, dtype=np.float64) ssp = SSP( n_media=n_media, n_mesh=n_mesh, sigma=sigma, depth=depth, z=z_arr, c=c_arr, cs=cs_arr, rho=rho_arr, npts=npts, raw=raw_layers, cz=cz, fg=fg, ) bdry = Bdry( top_opt=top_opt, bot_opt=bot_opt, ssp_type=ssp_type, atten_unit=atten_unit, top=top_hs, bot=bot_hs, top_rho_inside=float(rho_arr[top_inside_idx]), top_c_inside=complex(c_arr[top_inside_idx]), bot_rho_inside=float(rho_arr[bot_inside_idx]), bot_c_inside=complex(c_arr[bot_inside_idx]), ) return EnvCore(title=title, freq_hz=freq_hz, ssp=ssp, bdry=bdry), reader.i
[docs] def parse_env_core(text: str) -> EnvCore: """Parse the core `.env` structure (port of Matlab `read_env_core.m`). This is intended as a faithful, **filesystem-free** parser. It is line-oriented, matching Matlab's `fscanf` + `fgetl` pattern. Lines after the core block (e.g. KRAKEN ``CMIN``/``CMAX``, ``RMAX``, source/receiver depths) are ignored. Use :func:`parse_env_kraken` to read those. """ lines = text.splitlines() env, _ = _parse_env_core_lines(lines, 0) return env
[docs] def parse_env_core_bytes(data: bytes, *, encoding: str = "utf-8") -> EnvCore: """Like :func:`parse_env_core` after decoding ``data`` with ``encoding``.""" return parse_env_core(data.decode(encoding, errors="replace"))
[docs] def parse_env_kraken(text: str) -> ParsedEnvKraken: """Parse a full ``.env`` including the KRAKEN-family tail (Matlab ``read_env``). After :func:`read_env_core` Matlab reads ``cInt`` (CMIN/CMAX), ``RMax``, then :func:`at_py.readwrite.vector.parse_readvector_lines` twice for source and receiver depths (``readszrz``). Applies to models using that block: ``KRAKEN``, ``SCOOTER``, ``SPARC``, ``BOUNCE``, etc. (see Matlab ``read_env.m``). """ lines = text.splitlines() core, i = _parse_env_core_lines(lines, 0) need = 6 if len(lines) - i < need: raise ValueError( "expected KRAKEN-family tail after env core: CMIN/CMAX line, RMAX line, NSD+SD, NRD+RD" ) c_low, c_high = _floats_from_line_prefix(lines[i], 2) r_max_km = _floats_from_line_prefix(lines[i + 1], 1)[0] sd, _nsd, j = parse_readvector_lines(lines, i + 2) rd, _nrd, k = parse_readvector_lines(lines, j) for tail_i in range(k, len(lines)): if lines[tail_i].strip(): raise ValueError(f"unexpected line after KRAKEN env tail: {lines[tail_i]!r}") tail = KrakenEnvTail( c_low=c_low, c_high=c_high, r_max_km=r_max_km, source_z=sd, receiver_z=rd, ) return ParsedEnvKraken(core=core, tail=tail)
[docs] def parse_env_kraken_bytes(data: bytes, *, encoding: str = "utf-8") -> ParsedEnvKraken: """Like :func:`parse_env_kraken` after decoding ``data`` with ``encoding``.""" return parse_env_kraken(data.decode(encoding, errors="replace"))