Source code for at_py.readwrite.env_format

"""Format ``.env`` text from parsed structures (inverse of :mod:`at_py.readwrite.env`)."""

from __future__ import annotations

from typing import Literal

import numpy as np

from at_py.readwrite.env import SSP, EnvCore, HalfspaceRaw, ParsedEnvKraken
from at_py.readwrite.env_bellhop import BellhopBeamParams, ParsedEnvBellhop, ParsedEnvBellhop3D
from at_py.readwrite.env_ram import ParsedEnvRAM, format_env_ram
from at_py.readwrite.vector import format_readvector_lines


def _quote_env(s: str) -> str:
    """Single-quote ``s`` for Fortran env output, doubling embedded quotes."""
    return "'" + s.replace("'", "''") + "'"


def _fmt_env_float(x: float) -> str:
    """Format a float with enough precision for round-trip env text."""
    return f"{x:.17g}"


def _format_halfspace_line(z: float, raw: HalfspaceRaw) -> str:
    """One half-space coefficient line (z, alphas, rho, …) with trailing ``/``."""
    return (
        f"{_fmt_env_float(z)} {_fmt_env_float(raw.alphaR)} {_fmt_env_float(raw.betaR)} "
        f"{_fmt_env_float(raw.rho)} {_fmt_env_float(raw.alphaI)} {_fmt_env_float(raw.betaI)} /"
    )


def _format_ssp_medium_lines(ssp: SSP, medium: int) -> list[str]:
    """Emit header + one line per mesh point for SSP medium index ``medium``."""
    raw = ssp.raw[medium]
    lines: list[str] = []
    lines.append(
        f"{int(ssp.n_mesh[medium])} {_fmt_env_float(ssp.sigma[medium])} "
        f"{_fmt_env_float(ssp.depth[medium + 1])}"
    )
    n = int(raw.z.size)
    for i in range(n):
        z = float(raw.z[i])
        ar, br = raw.alphaR[i], raw.betaR[i]
        rho, ai, bi = raw.rho[i], raw.alphaI[i], raw.betaI[i]
        row = (
            f"{_fmt_env_float(z)} {_fmt_env_float(ar)} {_fmt_env_float(br)} "
            f"{_fmt_env_float(rho)} {_fmt_env_float(ai)} {_fmt_env_float(bi)}"
        )
        row += " /"
        lines.append(row)
    return lines


[docs] def format_parsed_env( parsed: ParsedEnvKraken | ParsedEnvBellhop | ParsedEnvBellhop3D | ParsedEnvRAM, ) -> str: """Serialize a full ``.env`` from :func:`~at_py.readwrite.env_dispatch.parse_read_env`.""" if isinstance(parsed, ParsedEnvKraken): return format_env_kraken(parsed) if isinstance(parsed, ParsedEnvBellhop): return format_env_bellhop(parsed) if isinstance(parsed, ParsedEnvBellhop3D): return format_env_bellhop3d(parsed) if isinstance(parsed, ParsedEnvRAM): return format_env_ram(parsed) raise TypeError(f"unsupported parsed env type: {type(parsed)!r}")
[docs] def format_parsed_env_bytes( parsed: ParsedEnvKraken | ParsedEnvBellhop | ParsedEnvBellhop3D | ParsedEnvRAM, *, encoding: str = "utf-8", ) -> bytes: """UTF-8 (or other) bytes for :func:`format_parsed_env`.""" return format_parsed_env(parsed).encode(encoding)
[docs] def format_env_core(core: EnvCore) -> str: """Format the core ``.env`` block (Matlab ``write_env`` / ``writessp`` core section).""" bdry = core.bdry ssp = core.ssp lines: list[str] = [] lines.append(_quote_env(core.title)) lines.append(_fmt_env_float(core.freq_hz)) lines.append(str(ssp.n_media)) top_pad = (bdry.top_opt + " ")[:7] lines.append(_quote_env(top_pad.rstrip())) if len(top_pad) >= 4 and top_pad[3:4] == "F": fg = ssp.fg if fg is None: raise ValueError("top_opt requests Francois–Garrison (F) but ssp.fg is None") lines.append( f"{_fmt_env_float(fg.T)} {_fmt_env_float(fg.S)} " f"{_fmt_env_float(fg.pH)} {_fmt_env_float(fg.z_bar)}" ) if bdry.top.bc == "A": if bdry.top.raw is None or bdry.top.depth is None: raise ValueError("acoustic top halfspace requires raw coefficients and depth") lines.append(_format_halfspace_line(float(bdry.top.depth), bdry.top.raw)) for m in range(ssp.n_media): lines.extend(_format_ssp_medium_lines(ssp, m)) bot_pad = (bdry.bot_opt + " ")[:3] lines.append(_quote_env(bot_pad.rstrip())) if bdry.bot.bc == "A": if bdry.bot.raw is None or bdry.bot.depth is None: raise ValueError("acoustic bottom halfspace requires raw coefficients and depth") lines.append(_format_halfspace_line(float(bdry.bot.depth), bdry.bot.raw)) return "\n".join(lines) + "\n"
[docs] def format_env_core_bytes(core: EnvCore, *, encoding: str = "utf-8") -> bytes: """Like :func:`format_env_core` encoded to bytes.""" return format_env_core(core).encode(encoding)
[docs] def format_env_kraken(p: ParsedEnvKraken) -> str: """Format a KRAKEN-family ``.env`` (inverse of :func:`at_py.readwrite.env.parse_env_kraken`).""" core = p.core tail = p.tail body = format_env_core(core).rstrip("\n") sd = np.asarray(tail.source_z, dtype=np.float64).reshape(-1) rd = np.asarray(tail.receiver_z, dtype=np.float64).reshape(-1) parts: list[str] = [ body, f"{_fmt_env_float(tail.c_low)} {_fmt_env_float(tail.c_high)}", _fmt_env_float(tail.r_max_km), format_readvector_lines(int(sd.size), sd).rstrip("\n"), format_readvector_lines(int(rd.size), rd).rstrip("\n"), ] return "\n".join(parts) + "\n"
[docs] def format_env_kraken_bytes(p: ParsedEnvKraken, *, encoding: str = "utf-8") -> bytes: """Like :func:`format_env_kraken` encoded to bytes.""" return format_env_kraken(p).encode(encoding)
def _angle_fan_data_line(alpha_deg: np.ndarray) -> str: """Format angle grid (degrees) as one env line (supports ``/`` shorthand).""" a = np.asarray(alpha_deg, dtype=np.float64).reshape(-1) n = int(a.size) if n < 1: raise ValueError("empty angle fan") if n == 1: return _fmt_env_float(float(a[0])) if n == 2: return f"{_fmt_env_float(float(a[0]))} {_fmt_env_float(float(a[1]))}" lin = np.linspace(float(a[0]), float(a[-1]), n, dtype=np.float64) if np.allclose(a, lin): return f"{_fmt_env_float(float(a[0]))} {_fmt_env_float(float(a[-1]))} /" return " ".join(_fmt_env_float(float(x)) for x in a) def _format_bellhop_post_run_lines( beam: BellhopBeamParams, top_opt_7: str, model: Literal["BELLHOP", "BELLHOP3D"], ) -> list[str]: """Lines after the quoted run-type for Bellhop / Bellhop3D beam blocks.""" top = (top_opt_7 + " ")[:7] out: list[str] = [] out.append(str(int(beam.nbeams))) if len(top) > 5 and top[5:6] == "I" and beam.ibeam is not None: out.append(str(int(beam.ibeam))) out.append(_angle_fan_data_line(beam.alpha_deg)) if model == "BELLHOP3D": if beam.beta_deg is None or beam.nbeta is None: raise ValueError("Bellhop3D beam block requires beta_deg and nbeta") out.append(str(int(beam.nbeta))) out.append(_angle_fan_data_line(beam.beta_deg)) out.append(_fmt_env_float(beam.deltas_m)) if model == "BELLHOP": if beam.box_r_km is None: raise ValueError("Bellhop 2D requires box_r_km") out.append(_fmt_env_float(beam.box_z_m)) out.append(_fmt_env_float(beam.box_r_km)) else: if beam.box_x_km is None or beam.box_y_km is None: raise ValueError("Bellhop3D requires box_x_km, box_y_km") out.append(_fmt_env_float(beam.box_x_km)) out.append(_fmt_env_float(beam.box_y_km)) out.append(_fmt_env_float(beam.box_z_m)) rt_chk = (beam.run_type + " ")[:5] rt2 = rt_chk[1:2].upper() if rt2 not in ("G", "B", "S"): if beam.beam_type_ms is None or beam.epmult is None or beam.r_loop is None: raise ValueError("beam_type_ms, epmult, r_loop required when run_type[1] not in G,B,S") if beam.n_image is None or beam.ib_win is None: raise ValueError("n_image and ib_win required when run_type[1] not in G,B,S") bt = (beam.beam_type_ms + " ")[:5] out.append(_quote_env(bt.rstrip())) out.append(_fmt_env_float(beam.epmult)) out.append(_fmt_env_float(beam.r_loop)) out.append(str(int(beam.n_image))) out.append(str(int(beam.ib_win))) return out
[docs] def format_env_bellhop(p: ParsedEnvBellhop) -> str: """Format a 2D Bellhop ``.env`` (inverse of :func:`parse_env_bellhop`).""" core = p.core tail = p.tail body = format_env_core(core).rstrip("\n") sd = np.asarray(tail.source_z_m, dtype=np.float64).reshape(-1) rd = np.asarray(tail.receiver_z_m, dtype=np.float64).reshape(-1) rr = np.asarray(tail.receiver_range_km, dtype=np.float64).reshape(-1) top_opt = (core.bdry.top_opt + " ")[:7] parts: list[str] = [ body, format_readvector_lines(int(sd.size), sd).rstrip("\n"), format_readvector_lines(int(rd.size), rd).rstrip("\n"), format_readvector_lines(int(rr.size), rr).rstrip("\n"), _quote_env((tail.beam.run_type + " ")[:5]), ] parts.extend(_format_bellhop_post_run_lines(tail.beam, top_opt, "BELLHOP")) return "\n".join(parts) + "\n"
[docs] def format_env_bellhop_bytes(p: ParsedEnvBellhop, *, encoding: str = "utf-8") -> bytes: """Like :func:`format_env_bellhop` encoded to bytes.""" return format_env_bellhop(p).encode(encoding)
[docs] def format_env_bellhop3d(p: ParsedEnvBellhop3D) -> str: """Format a Bellhop3D ``.env`` (inverse of :func:`parse_env_bellhop3d`).""" core = p.core tail = p.tail body = format_env_core(core).rstrip("\n") sx = np.asarray(tail.source_x_m, dtype=np.float64).reshape(-1) / 1000.0 sy = np.asarray(tail.source_y_m, dtype=np.float64).reshape(-1) / 1000.0 sd = np.asarray(tail.source_z_m, dtype=np.float64).reshape(-1) rd = np.asarray(tail.receiver_z_m, dtype=np.float64).reshape(-1) rr = np.asarray(tail.receiver_range_km, dtype=np.float64).reshape(-1) th = np.asarray(tail.receiver_theta_deg, dtype=np.float64).reshape(-1) top_opt = (core.bdry.top_opt + " ")[:7] parts: list[str] = [ body, format_readvector_lines(int(sx.size), sx).rstrip("\n"), format_readvector_lines(int(sy.size), sy).rstrip("\n"), format_readvector_lines(int(sd.size), sd).rstrip("\n"), format_readvector_lines(int(rd.size), rd).rstrip("\n"), format_readvector_lines(int(rr.size), rr).rstrip("\n"), format_readvector_lines(int(th.size), th).rstrip("\n"), _quote_env((tail.beam.run_type + " ")[:5]), ] parts.extend(_format_bellhop_post_run_lines(tail.beam, top_opt, "BELLHOP3D")) return "\n".join(parts) + "\n"
[docs] def format_env_bellhop3d_bytes(p: ParsedEnvBellhop3D, *, encoding: str = "utf-8") -> bytes: """Like :func:`format_env_bellhop3d` encoded to bytes.""" return format_env_bellhop3d(p).encode(encoding)