Source code for at_py.readwrite.env_ram

"""RAM ``ram.in`` environmental file (Matlab ``write_env_RAM.m`` / RAM Fortran input)."""

from __future__ import annotations

from dataclasses import dataclass

import numpy as np

from at_py.readwrite.env import _LineIter


def _before_comment(line: str) -> str:
    """Text before ``!`` (Fortran-style comment), stripped."""
    if "!" in line:
        return line.split("!", 1)[0].strip()
    return line.strip()


def _comment_hint(line: str) -> str:
    """Lowercased comment tail after ``!``, if any."""
    if "!" not in line:
        return ""
    return line.split("!", 1)[1].strip().lower()


def _extract_title(line: str) -> str:
    """First single-quoted Fortran string (Matlab ``'%s'`` title line)."""
    parts = line.split("'")
    if len(parts) < 3:
        raise ValueError(f"expected quoted RAM title line, got: {line!r}")
    return parts[1].replace("''", "'")


def _is_terminator(line: str) -> bool:
    """``-1 -1`` end marker (optional comment after ``!``)."""
    head = _before_comment(line)
    return head.replace("  ", " ").strip() in {"-1 -1", "-1.0 -1.0"}


[docs] @dataclass(frozen=True) class RamSegment: """One range-dependent SSP block (Matlab ``write_env_RAM`` inner loop).""" rp_m: float | None z_m: np.ndarray c_m: np.ndarray sediment_cp: float rho: float attn_z_m: np.ndarray attn_alpha: np.ndarray
[docs] @dataclass(frozen=True) class ParsedEnvRAM: """Parsed RAM ``ram.in`` (inverse of Matlab ``write_env_RAM.m`` for canonical output).""" title: str freq_hz: float zs_m: float zr_m: float rmax_m: float dr: float ndr: int zmax: float dz: float ndz: int zmplt: float c0: float n_pade: int n_starter: int rs: float bathy_r_m: np.ndarray bathy_z_m: np.ndarray segments: tuple[RamSegment, ...]
def _parse_floats(line: str, n: int) -> list[float]: """Parse ``n`` leading floats from the pre-comment part of ``line``.""" tok = _before_comment(line).split() if len(tok) < n: raise ValueError(f"expected at least {n} numbers in line: {line!r}") return [float(tok[i]) for i in range(n)] def _read_zc_block(reader: _LineIter) -> tuple[np.ndarray, np.ndarray]: """Read ``z c`` lines until ``-1 -1``.""" zs: list[float] = [] cs: list[float] = [] while reader.i < len(reader._lines): line = reader._lines[reader.i] if not line.strip(): reader.i += 1 continue if _is_terminator(line): reader.i += 1 break hint = _comment_hint(line) if "z c" not in hint and "z cw" not in hint: break a, b = _parse_floats(line, 2) zs.append(a) cs.append(b) reader.i += 1 else: raise ValueError("unexpected EOF inside RAM z/c block") return np.asarray(zs, dtype=np.float64), np.asarray(cs, dtype=np.float64) def _read_bottom_block(reader: _LineIter) -> tuple[float, float, np.ndarray, np.ndarray]: """Sediment (``/``), density, three attenuation pairs; each followed by ``-1 -1``.""" if reader.i >= len(reader._lines): raise ValueError("unexpected EOF in RAM bottom block") sed_line = reader._lines[reader.i] reader.i += 1 sed_raw = _before_comment(sed_line) if "/" not in sed_raw: raise ValueError(f"expected sediment line with '/', got: {sed_line!r}") sed_tok = [x for x in sed_raw.replace("/", " ").split() if x] if len(sed_tok) < 2: raise ValueError(f"invalid sediment line: {sed_line!r}") sediment_cp = float(sed_tok[1]) if reader.i >= len(reader._lines) or not _is_terminator(reader._lines[reader.i]): raise ValueError("expected -1 -1 after RAM sediment line") reader.i += 1 if reader.i >= len(reader._lines): raise ValueError("unexpected EOF after sediment terminator") rho_line = reader._lines[reader.i] reader.i += 1 rho = _parse_floats(rho_line, 2)[1] if reader.i >= len(reader._lines) or not _is_terminator(reader._lines[reader.i]): raise ValueError("expected -1 -1 after RAM density line") reader.i += 1 attn_z: list[float] = [] attn_a: list[float] = [] for _ in range(3): if reader.i >= len(reader._lines): raise ValueError("unexpected EOF in RAM attenuation lines") aline = reader._lines[reader.i] reader.i += 1 az, aa = _parse_floats(aline, 2) attn_z.append(az) attn_a.append(aa) if reader.i >= len(reader._lines) or not _is_terminator(reader._lines[reader.i]): raise ValueError("expected -1 -1 after RAM attenuation block") reader.i += 1 return ( sediment_cp, rho, np.asarray(attn_z, dtype=np.float64), np.asarray(attn_a, dtype=np.float64), ) def _parse_one_segment(reader: _LineIter, *, first: bool) -> RamSegment | None: """Parse one profile block; return ``None`` if at EOF.""" while reader.i < len(reader._lines) and not reader._lines[reader.i].strip(): reader.i += 1 if reader.i >= len(reader._lines): return None line0 = reader._lines[reader.i] rp: float | None = None hint0 = _comment_hint(line0) dc0 = _before_comment(line0) parts0 = dc0.split() if (not first) and len(parts0) == 1 and "rp" in hint0: rp = float(parts0[0]) reader.i += 1 elif (not first) and len(parts0) >= 2 and parts0[1].lower() == "rp": # Alternate: ``25000.0 rp`` (no ``!``) rp = float(parts0[0]) reader.i += 1 z_m, c_m = _read_zc_block(reader) if z_m.size == 0: raise ValueError("empty RAM z/c profile") sed_cp, rho, attn_z, attn_a = _read_bottom_block(reader) return RamSegment( rp_m=rp, z_m=z_m, c_m=c_m, sediment_cp=sed_cp, rho=rho, attn_z_m=attn_z, attn_alpha=attn_a, )
[docs] def parse_env_ram(text: str) -> ParsedEnvRAM: """Parse RAM ``ram.in`` (canonical Matlab ``write_env_RAM`` layout with ``!`` comments).""" raw_lines = text.splitlines() lines = [ln.rstrip() for ln in raw_lines] reader = _LineIter(lines, 0) def next_nonempty() -> str: while reader.i < len(lines): s = reader.next() if s.strip(): return s raise ValueError("unexpected end of RAM env file") title_line = next_nonempty() title = _extract_title(title_line) l1 = next_nonempty() freq_hz, zs_m, zr_m = _parse_floats(l1, 3) l2 = next_nonempty() rmax_m, dr, ndr = _parse_floats(l2, 3) ndr_i = int(ndr) l3 = next_nonempty() zmax, dz, ndz_f, zmplt = _parse_floats(l3, 4) ndz_i = int(ndz_f) l4 = next_nonempty() c0, n_pade_f, n_star_f, rs = _parse_floats(l4, 4) n_pade = int(n_pade_f) n_starter = int(n_star_f) br: list[float] = [] bz: list[float] = [] while reader.i < len(lines): line = reader._lines[reader.i] if not line.strip(): reader.i += 1 continue if _is_terminator(line): reader.i += 1 break hint = _comment_hint(line) if "rb zb" in hint or ("rb" in hint and "zb" in hint): r_i, z_i = _parse_floats(line, 2) br.append(r_i) bz.append(z_i) reader.i += 1 continue # Wedge-style bathy without comments tok = _before_comment(line).split() if len(tok) == 2: try: br.append(float(tok[0])) bz.append(float(tok[1])) reader.i += 1 continue except ValueError: pass break bathy_r_m = np.asarray(br, dtype=np.float64) bathy_z_m = np.asarray(bz, dtype=np.float64) segs: list[RamSegment] = [] first = True while reader.i < len(lines): seg = _parse_one_segment(reader, first=first) if seg is None: break segs.append(seg) first = False return ParsedEnvRAM( title=title, freq_hz=freq_hz, zs_m=zs_m, zr_m=zr_m, rmax_m=rmax_m, dr=dr, ndr=ndr_i, zmax=zmax, dz=dz, ndz=ndz_i, zmplt=zmplt, c0=c0, n_pade=n_pade, n_starter=n_starter, rs=rs, bathy_r_m=bathy_r_m, bathy_z_m=bathy_z_m, segments=tuple(segs), )
[docs] def parse_env_ram_bytes(data: bytes, *, encoding: str = "utf-8") -> ParsedEnvRAM: """Like :func:`parse_env_ram` but decodes ``data`` first.""" return parse_env_ram(data.decode(encoding, errors="replace"))
def _fmt8(x: float) -> str: return f"{x:8.2f}" def _fmt6(x: float) -> str: return f"{x:6.2f}"
[docs] def format_env_ram(p: ParsedEnvRAM) -> str: """Serialize RAM ``ram.in`` (inverse of :func:`parse_env_ram` for canonical Matlab layout).""" lines: list[str] = [] t = p.title.replace("'", "''") lines.append(f"'{t}' ! Title ") # Matlab ``fprintf`` uses ``%8.2f`` fields concatenated (same as Dickins ``ram.in``). lines.append(f"{_fmt8(p.freq_hz)}{_fmt8(p.zs_m)}{_fmt8(p.zr_m)} ! Frequency (Hz) zs zr ") lines.append(f"{_fmt8(p.rmax_m)}{_fmt8(p.dr)}{p.ndr:3d} ! rmax dr ndr ") lines.append(f"{_fmt8(p.zmax)}{_fmt8(p.dz)}{p.ndz:3d}{_fmt8(p.zmplt)} ! zmax dz ndz zmplt ") lines.append(f"{_fmt8(p.c0)} {p.n_pade} {p.n_starter} {_fmt8(p.rs)} ! c0 np ns rs ") for i in range(int(p.bathy_r_m.size)): lines.append(f"{_fmt8(float(p.bathy_r_m[i]))}{_fmt8(float(p.bathy_z_m[i]))} ! rb zb ") lines.append("-1 -1 ") for si, seg in enumerate(p.segments): if si > 0 and seg.rp_m is not None: lines.append(f"\t {_fmt6(seg.rp_m)} \t ! rp ") for j in range(int(seg.z_m.size)): lines.append(f"\t {_fmt6(float(seg.z_m[j]))} {_fmt6(float(seg.c_m[j]))} \t ! z c ") lines.append("-1 -1 ") lines.append(f" {_fmt6(0.0)} {_fmt6(seg.sediment_cp)} / \t ! lower halfspace ") lines.append("-1 -1 ") lines.append(f" {_fmt6(0.0)} {_fmt6(seg.rho)} \t ! lower halfspace ") lines.append("-1 -1 ") for k in range(3): az = _fmt6(float(seg.attn_z_m[k])) aa = _fmt6(float(seg.attn_alpha[k])) lines.append(f" {az} {aa} \t ! lower halfspace ") lines.append("-1 -1 ") return "\n".join(lines) + "\n"
[docs] def format_env_ram_bytes(p: ParsedEnvRAM, *, encoding: str = "utf-8") -> bytes: """Like :func:`format_env_ram` encoded to bytes.""" return format_env_ram(p).encode(encoding)
__all__ = [ "ParsedEnvRAM", "RamSegment", "format_env_ram", "format_env_ram_bytes", "parse_env_ram", "parse_env_ram_bytes", ]