from __future__ import annotations
from dataclasses import dataclass
import numpy as np
from at_py.readwrite.attenuation import FrancoisGarrisonParams, crci
from at_py.readwrite.vector import parse_readvector_lines
class _LineIter:
"""Forward-only line iterator with 1-based-style `next` for env parsing."""
__slots__ = ("_lines", "i")
def __init__(self, lines: list[str], start: int = 0) -> None:
"""Start iteration at line index ``start``."""
self._lines = lines
self.i = start
def next(self) -> str:
"""Return the next non-consumed line and advance (raises if EOF)."""
if self.i >= len(self._lines):
raise ValueError("unexpected end of env file")
s = self._lines[self.i]
self.i += 1
return s
def _extract_quoted(s: str) -> str:
"""Return the first single-quoted substring in ``s`` (Fortran env lines)."""
parts = s.split("'")
if len(parts) < 3:
raise ValueError(f"expected quoted string in line: {s!r}")
return parts[1]
@dataclass(frozen=True)
class HalfspaceRaw:
"""Raw P/S wave speeds and density coefficients for an acoustic half-space line."""
alphaR: float
alphaI: float
betaR: float
betaI: float
rho: float
@dataclass(frozen=True)
class Halfspace:
"""Acoustic half-space boundary: boundary code, complex speeds, density, optional raw line."""
bc: str
cp: complex
cs: complex
rho: float
depth: float | None = None
raw: HalfspaceRaw | None = None
@dataclass(frozen=True)
class SSPRawLayer:
"""Per-medium raw SSP mesh: depth and viscoelastic parameters at each point."""
z: np.ndarray
alphaR: np.ndarray
alphaI: np.ndarray
betaR: np.ndarray
betaI: np.ndarray
rho: np.ndarray
@dataclass(frozen=True)
class SSP:
"""Stacked sound-speed profile across media (grids, densities, optional Francois–Garrison)."""
n_media: int
n_mesh: np.ndarray
sigma: np.ndarray
depth: np.ndarray
z: np.ndarray
c: np.ndarray
cs: np.ndarray
rho: np.ndarray
npts: np.ndarray
raw: list[SSPRawLayer]
cz: np.ndarray | None = None
fg: FrancoisGarrisonParams | None = None
@dataclass(frozen=True)
class Bdry:
"""Top/bottom boundary options, SSP type, attenuation unit, and half-space structs."""
top_opt: str
bot_opt: str
ssp_type: str
atten_unit: str
top: Halfspace
bot: Halfspace
top_rho_inside: float
top_c_inside: complex
bot_rho_inside: float
bot_c_inside: complex
[docs]
@dataclass(frozen=True)
class EnvCore:
"""Common ``.env`` header: title, frequency, and full SSP plus boundary section."""
title: str
freq_hz: float
ssp: SSP
bdry: Bdry
[docs]
@dataclass(frozen=True)
class KrakenEnvTail:
"""Options after ``read_env_core`` for KRAKEN/SCOOTER/SPARC/BOUNCE (Matlab ``read_env.m``)."""
c_low: float
c_high: float
r_max_km: float
source_z: np.ndarray
receiver_z: np.ndarray
[docs]
@dataclass(frozen=True)
class ParsedEnvKraken:
"""Full ``.env`` parse for the KRAKEN-family tail section."""
core: EnvCore
tail: KrakenEnvTail
def _split_env_numeric_tokens(line: str) -> list[str]:
"""Split a Fortran env line, dropping ``!`` comments and a trailing ``/``."""
s = line.strip()
if "!" in s:
s = s.split("!", 1)[0].strip()
parts: list[str] = []
for tok in s.split():
if tok == "/":
break
parts.append(tok)
return parts
def _floats_from_line_prefix(line: str, count: int) -> list[float]:
"""Parse leading floats on a line, stopping at ``!`` (Fortran-style comment)."""
out: list[float] = []
for tok in line.split():
if tok.startswith("!"):
break
try:
out.append(float(tok))
except ValueError:
continue
if len(out) >= count:
break
if len(out) < count:
raise ValueError(f"expected at least {count} float(s) on line: {line!r}")
return out[:count]
def _parse_env_core_lines(lines: list[str], start: int = 0) -> tuple[EnvCore, int]:
"""Parse env core; return ``(EnvCore, next_line_index)`` (Matlab `read_env_core.m`)."""
reader = _LineIter(lines, start)
def next_line() -> str:
"""Forward to :meth:`_LineIter.next` (local alias for readability)."""
return reader.next()
# Defaults (Matlab globals)
alphaR = 1500.0
betaR = 0.0
rhoR = 1.0
alphaI = 0.0
betaI = 0.0
title = _extract_quoted(next_line())
freq_hz = float(next_line().split()[0])
n_media = int(next_line().split()[0])
top_opt_line = next_line()
top_opt = (_extract_quoted(top_opt_line) + " ")[:7]
# deprecated '*' → '~' at position 5 (1-based)
if len(top_opt) >= 5 and top_opt[4:5] == "*":
top_opt = top_opt[:4] + "~" + top_opt[5:]
ssp_type = top_opt[0:1]
top_bc = top_opt[1:2]
atten_unit = top_opt[2:4]
fg: FrancoisGarrisonParams | None = None
if len(top_opt) >= 4 and top_opt[3:4] == "F":
t, s, ph, zb = [float(x) for x in next_line().split()[:4]]
fg = FrancoisGarrisonParams(T=t, S=s, pH=ph, z_bar=zb)
def read_halfspace(bc: str) -> Halfspace:
"""Parse one acoustic half-space line when ``bc == 'A'``."""
nonlocal alphaR, betaR, rhoR, alphaI, betaI
if bc != "A":
return Halfspace(bc=bc, cp=0.0 + 0.0j, cs=0.0 + 0.0j, rho=0.0)
vals = [float(x) for x in _split_env_numeric_tokens(next_line())]
# ztmp present but not stored for halfspaces in Matlab (used for CRCI depth in Fortran)
ztmp_hs = float(vals[0]) if len(vals) >= 1 else 0.0
if len(vals) >= 2:
alphaR = vals[1]
if len(vals) >= 3:
betaR = vals[2]
if len(vals) >= 4:
rhoR = vals[3]
if len(vals) >= 5:
alphaI = vals[4]
if len(vals) >= 6:
betaI = vals[5]
cp = crci(alphaR, alphaI, freq_hz=freq_hz, atten_unit=atten_unit, fg=fg, depth_m=ztmp_hs)
cs = crci(betaR, betaI, freq_hz=freq_hz, atten_unit=atten_unit, fg=fg, depth_m=ztmp_hs)
return Halfspace(
bc=bc,
cp=cp,
cs=cs,
rho=rhoR,
raw=HalfspaceRaw(alphaR=alphaR, alphaI=alphaI, betaR=betaR, betaI=betaI, rho=rhoR),
)
top_hs = read_halfspace(top_bc)
# Read SSP layers
n_mesh = np.zeros(n_media, dtype=np.int32)
sigma = np.zeros(n_media, dtype=np.float64)
depth = np.zeros(n_media + 1, dtype=np.float64)
z_all: list[float] = []
c_all: list[complex] = []
cs_all: list[complex] = []
rho_all: list[float] = []
npts = np.zeros(n_media, dtype=np.int32)
raw_layers: list[SSPRawLayer] = []
loc = np.zeros(n_media, dtype=np.int32)
n_first_acoustic = 0
n_last_acoustic = 0
for medium in range(n_media):
if medium == 0:
loc[medium] = 0
else:
loc[medium] = int(loc[medium - 1] + npts[medium - 1])
parts = next_line().split()
n_mesh[medium] = int(parts[0])
sigma[medium] = float(parts[1])
depth[medium + 1] = float(parts[2])
layer_z: list[float] = []
layer_alphaR: list[float] = []
layer_alphaI: list[float] = []
layer_betaR: list[float] = []
layer_betaI: list[float] = []
layer_rho: list[float] = []
has_shear = False
target = depth[medium + 1]
for _ in range(10_000_000):
row = _split_env_numeric_tokens(next_line())
if not row:
continue
ztmp = float(row[0])
if len(row) >= 2:
alphaR = float(row[1])
if len(row) >= 3:
betaR = float(row[2])
if len(row) >= 4:
rhoR = float(row[3])
if len(row) >= 5:
alphaI = float(row[4])
if len(row) >= 6:
betaI = float(row[5])
cp = crci(alphaR, alphaI, freq_hz=freq_hz, atten_unit=atten_unit, fg=fg, depth_m=ztmp)
css = crci(betaR, betaI, freq_hz=freq_hz, atten_unit=atten_unit, fg=fg, depth_m=ztmp)
z_all.append(ztmp)
c_all.append(cp)
cs_all.append(css)
rho_all.append(rhoR)
layer_z.append(ztmp)
layer_alphaR.append(alphaR)
layer_alphaI.append(alphaI)
layer_betaR.append(betaR)
layer_betaI.append(betaI)
layer_rho.append(rhoR)
if betaR != 0.0:
has_shear = True
if abs(ztmp - target) <= 1e-7:
if medium == 0:
depth[0] = z_all[0]
break
else:
raise ValueError("SSP layer parse exceeded iteration limit")
npts[medium] = len(layer_z)
raw_layers.append(
SSPRawLayer(
z=np.asarray(layer_z, dtype=np.float64),
alphaR=np.asarray(layer_alphaR, dtype=np.float64),
alphaI=np.asarray(layer_alphaI, dtype=np.float64),
betaR=np.asarray(layer_betaR, dtype=np.float64),
betaI=np.asarray(layer_betaI, dtype=np.float64),
rho=np.asarray(layer_rho, dtype=np.float64),
)
)
if n_mesh[medium] == 0:
C = betaR if betaR > 0.0 else alphaR
deltaz = 0.05 * C / freq_hz
n_mesh[medium] = int(round((depth[medium + 1] - depth[medium]) / deltaz))
n_mesh[medium] = max(int(n_mesh[medium]), 10)
if not has_shear:
if n_first_acoustic == 0:
n_first_acoustic = medium + 1 # 1-based like Matlab
n_last_acoustic = medium + 1
if medium == 0 and len(z_all) >= 2:
hv = np.diff(np.asarray(z_all, dtype=np.float64))
cz = np.diff(np.asarray(c_all, dtype=np.complex128)) / hv
else:
cz = None
bot_opt_line = next_line()
bot_opt = (_extract_quoted(bot_opt_line) + " ")[:3]
if len(bot_opt) >= 2 and bot_opt[1:2] == "*":
bot_opt = bot_opt[:1] + "~" + bot_opt[2:]
bot_bc = bot_opt[0:1]
bot_hs = read_halfspace(bot_bc)
# Boundary depths
top_hs = Halfspace(**{**top_hs.__dict__, "depth": float(depth[0])})
bot_hs = Halfspace(**{**bot_hs.__dict__, "depth": float(depth[-1])})
# rho/c just inside boundaries
if n_first_acoustic == 0 or n_last_acoustic == 0:
raise ValueError("no acoustic medium found")
top_inside_idx = n_first_acoustic - 1
bot_inside_idx = int(loc[n_last_acoustic - 1] + npts[n_last_acoustic - 1] - 1)
z_arr = np.asarray(z_all, dtype=np.float64)
c_arr = np.asarray(c_all, dtype=np.complex128)
cs_arr = np.asarray(cs_all, dtype=np.complex128)
rho_arr = np.asarray(rho_all, dtype=np.float64)
ssp = SSP(
n_media=n_media,
n_mesh=n_mesh,
sigma=sigma,
depth=depth,
z=z_arr,
c=c_arr,
cs=cs_arr,
rho=rho_arr,
npts=npts,
raw=raw_layers,
cz=cz,
fg=fg,
)
bdry = Bdry(
top_opt=top_opt,
bot_opt=bot_opt,
ssp_type=ssp_type,
atten_unit=atten_unit,
top=top_hs,
bot=bot_hs,
top_rho_inside=float(rho_arr[top_inside_idx]),
top_c_inside=complex(c_arr[top_inside_idx]),
bot_rho_inside=float(rho_arr[bot_inside_idx]),
bot_c_inside=complex(c_arr[bot_inside_idx]),
)
return EnvCore(title=title, freq_hz=freq_hz, ssp=ssp, bdry=bdry), reader.i
[docs]
def parse_env_core(text: str) -> EnvCore:
"""Parse the core `.env` structure (port of Matlab `read_env_core.m`).
This is intended as a faithful, **filesystem-free** parser. It is line-oriented,
matching Matlab's `fscanf` + `fgetl` pattern.
Lines after the core block (e.g. KRAKEN ``CMIN``/``CMAX``, ``RMAX``, source/receiver
depths) are ignored. Use :func:`parse_env_kraken` to read those.
"""
lines = text.splitlines()
env, _ = _parse_env_core_lines(lines, 0)
return env
[docs]
def parse_env_core_bytes(data: bytes, *, encoding: str = "utf-8") -> EnvCore:
"""Like :func:`parse_env_core` after decoding ``data`` with ``encoding``."""
return parse_env_core(data.decode(encoding, errors="replace"))
[docs]
def parse_env_kraken(text: str) -> ParsedEnvKraken:
"""Parse a full ``.env`` including the KRAKEN-family tail (Matlab ``read_env``).
After :func:`read_env_core` Matlab reads ``cInt`` (CMIN/CMAX), ``RMax``, then
:func:`at_py.readwrite.vector.parse_readvector_lines` twice for source and receiver
depths (``readszrz``).
Applies to models using that block: ``KRAKEN``, ``SCOOTER``, ``SPARC``, ``BOUNCE``,
etc. (see Matlab ``read_env.m``).
"""
lines = text.splitlines()
core, i = _parse_env_core_lines(lines, 0)
need = 6
if len(lines) - i < need:
raise ValueError(
"expected KRAKEN-family tail after env core: CMIN/CMAX line, RMAX line, NSD+SD, NRD+RD"
)
c_low, c_high = _floats_from_line_prefix(lines[i], 2)
r_max_km = _floats_from_line_prefix(lines[i + 1], 1)[0]
sd, _nsd, j = parse_readvector_lines(lines, i + 2)
rd, _nrd, k = parse_readvector_lines(lines, j)
for tail_i in range(k, len(lines)):
if lines[tail_i].strip():
raise ValueError(f"unexpected line after KRAKEN env tail: {lines[tail_i]!r}")
tail = KrakenEnvTail(
c_low=c_low,
c_high=c_high,
r_max_km=r_max_km,
source_z=sd,
receiver_z=rd,
)
return ParsedEnvKraken(core=core, tail=tail)
[docs]
def parse_env_kraken_bytes(data: bytes, *, encoding: str = "utf-8") -> ParsedEnvKraken:
"""Like :func:`parse_env_kraken` after decoding ``data`` with ``encoding``."""
return parse_env_kraken(data.decode(encoding, errors="replace"))