"""RAM ``ram.in`` environmental file (Matlab ``write_env_RAM.m`` / RAM Fortran input)."""
from __future__ import annotations
from dataclasses import dataclass
import numpy as np
from at_py.readwrite.env import _LineIter
def _before_comment(line: str) -> str:
"""Text before ``!`` (Fortran-style comment), stripped."""
if "!" in line:
return line.split("!", 1)[0].strip()
return line.strip()
def _comment_hint(line: str) -> str:
"""Lowercased comment tail after ``!``, if any."""
if "!" not in line:
return ""
return line.split("!", 1)[1].strip().lower()
def _extract_title(line: str) -> str:
"""First single-quoted Fortran string (Matlab ``'%s'`` title line)."""
parts = line.split("'")
if len(parts) < 3:
raise ValueError(f"expected quoted RAM title line, got: {line!r}")
return parts[1].replace("''", "'")
def _is_terminator(line: str) -> bool:
"""``-1 -1`` end marker (optional comment after ``!``)."""
head = _before_comment(line)
return head.replace(" ", " ").strip() in {"-1 -1", "-1.0 -1.0"}
[docs]
@dataclass(frozen=True)
class RamSegment:
"""One range-dependent SSP block (Matlab ``write_env_RAM`` inner loop)."""
rp_m: float | None
z_m: np.ndarray
c_m: np.ndarray
sediment_cp: float
rho: float
attn_z_m: np.ndarray
attn_alpha: np.ndarray
[docs]
@dataclass(frozen=True)
class ParsedEnvRAM:
"""Parsed RAM ``ram.in`` (inverse of Matlab ``write_env_RAM.m`` for canonical output)."""
title: str
freq_hz: float
zs_m: float
zr_m: float
rmax_m: float
dr: float
ndr: int
zmax: float
dz: float
ndz: int
zmplt: float
c0: float
n_pade: int
n_starter: int
rs: float
bathy_r_m: np.ndarray
bathy_z_m: np.ndarray
segments: tuple[RamSegment, ...]
def _parse_floats(line: str, n: int) -> list[float]:
"""Parse ``n`` leading floats from the pre-comment part of ``line``."""
tok = _before_comment(line).split()
if len(tok) < n:
raise ValueError(f"expected at least {n} numbers in line: {line!r}")
return [float(tok[i]) for i in range(n)]
def _read_zc_block(reader: _LineIter) -> tuple[np.ndarray, np.ndarray]:
"""Read ``z c`` lines until ``-1 -1``."""
zs: list[float] = []
cs: list[float] = []
while reader.i < len(reader._lines):
line = reader._lines[reader.i]
if not line.strip():
reader.i += 1
continue
if _is_terminator(line):
reader.i += 1
break
hint = _comment_hint(line)
if "z c" not in hint and "z cw" not in hint:
break
a, b = _parse_floats(line, 2)
zs.append(a)
cs.append(b)
reader.i += 1
else:
raise ValueError("unexpected EOF inside RAM z/c block")
return np.asarray(zs, dtype=np.float64), np.asarray(cs, dtype=np.float64)
def _read_bottom_block(reader: _LineIter) -> tuple[float, float, np.ndarray, np.ndarray]:
"""Sediment (``/``), density, three attenuation pairs; each followed by ``-1 -1``."""
if reader.i >= len(reader._lines):
raise ValueError("unexpected EOF in RAM bottom block")
sed_line = reader._lines[reader.i]
reader.i += 1
sed_raw = _before_comment(sed_line)
if "/" not in sed_raw:
raise ValueError(f"expected sediment line with '/', got: {sed_line!r}")
sed_tok = [x for x in sed_raw.replace("/", " ").split() if x]
if len(sed_tok) < 2:
raise ValueError(f"invalid sediment line: {sed_line!r}")
sediment_cp = float(sed_tok[1])
if reader.i >= len(reader._lines) or not _is_terminator(reader._lines[reader.i]):
raise ValueError("expected -1 -1 after RAM sediment line")
reader.i += 1
if reader.i >= len(reader._lines):
raise ValueError("unexpected EOF after sediment terminator")
rho_line = reader._lines[reader.i]
reader.i += 1
rho = _parse_floats(rho_line, 2)[1]
if reader.i >= len(reader._lines) or not _is_terminator(reader._lines[reader.i]):
raise ValueError("expected -1 -1 after RAM density line")
reader.i += 1
attn_z: list[float] = []
attn_a: list[float] = []
for _ in range(3):
if reader.i >= len(reader._lines):
raise ValueError("unexpected EOF in RAM attenuation lines")
aline = reader._lines[reader.i]
reader.i += 1
az, aa = _parse_floats(aline, 2)
attn_z.append(az)
attn_a.append(aa)
if reader.i >= len(reader._lines) or not _is_terminator(reader._lines[reader.i]):
raise ValueError("expected -1 -1 after RAM attenuation block")
reader.i += 1
return (
sediment_cp,
rho,
np.asarray(attn_z, dtype=np.float64),
np.asarray(attn_a, dtype=np.float64),
)
def _parse_one_segment(reader: _LineIter, *, first: bool) -> RamSegment | None:
"""Parse one profile block; return ``None`` if at EOF."""
while reader.i < len(reader._lines) and not reader._lines[reader.i].strip():
reader.i += 1
if reader.i >= len(reader._lines):
return None
line0 = reader._lines[reader.i]
rp: float | None = None
hint0 = _comment_hint(line0)
dc0 = _before_comment(line0)
parts0 = dc0.split()
if (not first) and len(parts0) == 1 and "rp" in hint0:
rp = float(parts0[0])
reader.i += 1
elif (not first) and len(parts0) >= 2 and parts0[1].lower() == "rp":
# Alternate: ``25000.0 rp`` (no ``!``)
rp = float(parts0[0])
reader.i += 1
z_m, c_m = _read_zc_block(reader)
if z_m.size == 0:
raise ValueError("empty RAM z/c profile")
sed_cp, rho, attn_z, attn_a = _read_bottom_block(reader)
return RamSegment(
rp_m=rp,
z_m=z_m,
c_m=c_m,
sediment_cp=sed_cp,
rho=rho,
attn_z_m=attn_z,
attn_alpha=attn_a,
)
[docs]
def parse_env_ram(text: str) -> ParsedEnvRAM:
"""Parse RAM ``ram.in`` (canonical Matlab ``write_env_RAM`` layout with ``!`` comments)."""
raw_lines = text.splitlines()
lines = [ln.rstrip() for ln in raw_lines]
reader = _LineIter(lines, 0)
def next_nonempty() -> str:
while reader.i < len(lines):
s = reader.next()
if s.strip():
return s
raise ValueError("unexpected end of RAM env file")
title_line = next_nonempty()
title = _extract_title(title_line)
l1 = next_nonempty()
freq_hz, zs_m, zr_m = _parse_floats(l1, 3)
l2 = next_nonempty()
rmax_m, dr, ndr = _parse_floats(l2, 3)
ndr_i = int(ndr)
l3 = next_nonempty()
zmax, dz, ndz_f, zmplt = _parse_floats(l3, 4)
ndz_i = int(ndz_f)
l4 = next_nonempty()
c0, n_pade_f, n_star_f, rs = _parse_floats(l4, 4)
n_pade = int(n_pade_f)
n_starter = int(n_star_f)
br: list[float] = []
bz: list[float] = []
while reader.i < len(lines):
line = reader._lines[reader.i]
if not line.strip():
reader.i += 1
continue
if _is_terminator(line):
reader.i += 1
break
hint = _comment_hint(line)
if "rb zb" in hint or ("rb" in hint and "zb" in hint):
r_i, z_i = _parse_floats(line, 2)
br.append(r_i)
bz.append(z_i)
reader.i += 1
continue
# Wedge-style bathy without comments
tok = _before_comment(line).split()
if len(tok) == 2:
try:
br.append(float(tok[0]))
bz.append(float(tok[1]))
reader.i += 1
continue
except ValueError:
pass
break
bathy_r_m = np.asarray(br, dtype=np.float64)
bathy_z_m = np.asarray(bz, dtype=np.float64)
segs: list[RamSegment] = []
first = True
while reader.i < len(lines):
seg = _parse_one_segment(reader, first=first)
if seg is None:
break
segs.append(seg)
first = False
return ParsedEnvRAM(
title=title,
freq_hz=freq_hz,
zs_m=zs_m,
zr_m=zr_m,
rmax_m=rmax_m,
dr=dr,
ndr=ndr_i,
zmax=zmax,
dz=dz,
ndz=ndz_i,
zmplt=zmplt,
c0=c0,
n_pade=n_pade,
n_starter=n_starter,
rs=rs,
bathy_r_m=bathy_r_m,
bathy_z_m=bathy_z_m,
segments=tuple(segs),
)
[docs]
def parse_env_ram_bytes(data: bytes, *, encoding: str = "utf-8") -> ParsedEnvRAM:
"""Like :func:`parse_env_ram` but decodes ``data`` first."""
return parse_env_ram(data.decode(encoding, errors="replace"))
def _fmt8(x: float) -> str:
return f"{x:8.2f}"
def _fmt6(x: float) -> str:
return f"{x:6.2f}"
__all__ = [
"ParsedEnvRAM",
"RamSegment",
"format_env_ram",
"format_env_ram_bytes",
"parse_env_ram",
"parse_env_ram_bytes",
]