Source code for at_py.readwrite.modes

"""Kraken mode file readers: binary (``read_modes_bin.m``) and ASCII (``read_modes_asc.m``)."""

from __future__ import annotations

import struct
from dataclasses import dataclass
from typing import Literal

import numpy as np


[docs] @dataclass class ModeHalfspace: """Top/bottom acoustic half-space coefficients from binary ``.mod`` (Matlab ``Top``/``Bot``).""" bc: str cp: np.complex128 cs: np.complex128 rho: np.float32 depth: np.float32
[docs] @dataclass class ModesAscReadResult: """Parsed ASCII ``.mod`` (Matlab ``read_modes_asc``).""" lrecl: int title: str freq: float nmedia: int ntot: int nmat: int num_modes: int z: np.ndarray # float32, shape (ntot,) k: np.ndarray # complex128, shape (n_read,) — selected modes only phi: np.ndarray # complex64, shape (ntot, n_read)
[docs] @dataclass class ModesReadResult: """Parsed ``.mod`` contents for one frequency block.""" title: str nfreq: int nmedia: int ntot: int nmat: int n_per_medium: np.ndarray # int32, shape (nmedia,) mater: list[bytes] # 8-byte labels per medium depth: np.ndarray # float32, interfaces / medium tops rho: np.ndarray # float32, density per medium freq_vec: np.ndarray # float64 z: np.ndarray # float32, depth grid for modes num_modes: int # number of modes at selected frequency (Matlab ``Modes.M``) top: ModeHalfspace bot: ModeHalfspace phi: np.ndarray # complex64, shape (nmat, n_read) k: np.ndarray # complex128, shape (n_read,) — selected modes only
def _rec0_offset(lrecl: int) -> int: """Byte offset after the leading int32 record-length word.""" return 4 # skip leading record-length int (same convention as Matlab) def _rec_n_offset(rec: int, lrecl: int) -> int: """Byte offset of Fortran direct-access record ``rec`` (``rec==0`` skips length word).""" if rec == 0: return _rec0_offset(lrecl) return rec * lrecl
[docs] def read_modes_bin( data: bytes, freq: float, *, modes: list[int] | None = None, ) -> ModesReadResult: """Parse Kraken binary mode file bytes (port of Matlab ``read_modes_bin``). ``modes`` uses **1-based** mode indices, matching Matlab (e.g. ``[1, 2, 3]``). If omitted, all modes at the selected frequency are returned. Use ``freq=0`` when the file contains a single frequency (Matlab convention). """ if len(data) < 4: raise ValueError("mode file too short") (recl_words,) = struct.unpack_from("<i", data, 0) lrecl = 4 * int(recl_words) if lrecl <= 0 or lrecl % 4 != 0: raise ValueError(f"invalid record length: {lrecl}") off = _rec_n_offset(0, lrecl) title_bytes = data[off : off + 80] off += 80 title = title_bytes.decode("utf-8", errors="replace").rstrip("\x00").rstrip() nfreq, nmedia, ntot, nmat = struct.unpack_from("<4i", data, off) off += 16 if ntot < 0: raise ValueError("mode file indicates Ntot < 0 (unsupported / empty profile)") i_rec_profile = 1 off = i_rec_profile * lrecl n_per_medium = np.empty(nmedia, dtype=np.int32) mater: list[bytes] = [] for _m in range(nmedia): (n_per_medium[_m],) = struct.unpack_from("<i", data, off) off += 4 mater.append(data[off : off + 8]) off += 8 off = (i_rec_profile + 1) * lrecl bulk = np.frombuffer(data, dtype="<f4", count=2 * nmedia, offset=off).reshape(2, nmedia) depth = bulk[0, :].astype(np.float32, copy=True) rho_med = bulk[1, :].astype(np.float32, copy=True) off = (i_rec_profile + 2) * lrecl freq_vec = np.frombuffer(data, dtype="<f8", count=nfreq, offset=off).copy() off = (i_rec_profile + 3) * lrecl z = np.frombuffer(data, dtype="<f4", count=ntot, offset=off).astype(np.float32, copy=True) i_rec_profile += 4 rec = i_rec_profile freq_index = int(np.argmin(np.abs(freq_vec - freq))) m_count = 0 for ifreq in range(freq_index + 1): off_m = rec * lrecl (m_count,) = struct.unpack_from("<i", data, off_m) if ifreq < freq_index: # Match Matlab: floor(4*(2*M-1)/lrecl) step = 3 + m_count + (4 * (2 * m_count - 1)) // lrecl i_rec_profile += step rec = i_rec_profile if modes is None: mode_list = list(range(1, m_count + 1)) else: mode_list = list(modes) mode_list = [mm for mm in mode_list if 1 <= mm <= m_count] off_hs = (i_rec_profile + 1) * lrecl o = off_hs top_bc = data[o : o + 1].decode("latin-1", errors="replace") o += 1 cp_r, cp_i = struct.unpack_from("<2f", data, o) o += 8 cs_r, cs_i = struct.unpack_from("<2f", data, o) o += 8 top_rho, top_depth = struct.unpack_from("<2f", data, o) o += 8 bot_bc = data[o : o + 1].decode("latin-1", errors="replace") o += 1 bcp_r, bcp_i = struct.unpack_from("<2f", data, o) o += 8 bcs_r, bcs_i = struct.unpack_from("<2f", data, o) o += 8 bot_rho, bot_depth = struct.unpack_from("<2f", data, o) top = ModeHalfspace( bc=top_bc, cp=np.complex128(complex(cp_r, cp_i)), cs=np.complex128(complex(cs_r, cs_i)), rho=np.float32(top_rho), depth=np.float32(top_depth), ) bot = ModeHalfspace( bc=bot_bc, cp=np.complex128(complex(bcp_r, bcp_i)), cs=np.complex128(complex(bcs_r, bcs_i)), rho=np.float32(bot_rho), depth=np.float32(bot_depth), ) if m_count == 0: phi = np.zeros((nmat, 0), dtype=np.complex64) k_sel = np.zeros(0, dtype=np.complex128) else: phi = np.zeros((nmat, len(mode_list)), dtype=np.complex64) for col, mode_num in enumerate(mode_list): off_phi = (i_rec_profile + 1 + mode_num) * lrecl raw = np.frombuffer(data, dtype="<f4", count=2 * nmat, offset=off_phi) real = raw[0::2] imag = raw[1::2] phi[:, col] = (real + 1j * imag).astype(np.complex64) off_k = (i_rec_profile + 2 + m_count) * lrecl k_all = np.frombuffer(data, dtype="<f4", count=2 * m_count, offset=off_k) k_re = k_all[0::2] k_im = k_all[1::2] k_full = (k_re + 1j * k_im).astype(np.complex128) idx0 = [mm - 1 for mm in mode_list] k_sel = k_full[idx0] return ModesReadResult( title=title, nfreq=nfreq, nmedia=nmedia, ntot=ntot, nmat=nmat, n_per_medium=n_per_medium, mater=mater, depth=depth, rho=rho_med, freq_vec=freq_vec, z=z, num_modes=m_count, top=top, bot=bot, phi=phi, k=k_sel, )
class _FloatLineCursor: """Sequential floats with ``fgetl``-style line skips (Matlab ``fscanf`` + ``fgetl``).""" __slots__ = ("_lines", "_li", "_pending") def __init__(self, lines: list[str], start_li: int) -> None: """Parse floats from ``lines`` starting at line ``start_li``.""" self._lines = lines self._li = start_li self._pending: list[str] = [] def _refill(self) -> None: """Pull tokens from subsequent lines into ``_pending`` (strip ``!`` comments).""" while self._li < len(self._lines) and not self._pending: part = self._lines[self._li].split("!", 1)[0] for t in part.replace(",", " ").split(): t = t.strip() if t: self._pending.append(t) self._li += 1 def take(self, n: int) -> np.ndarray: """Pop ``n`` float tokens (may span lines).""" out: list[float] = [] while len(out) < n: self._refill() if not self._pending: raise ValueError(f"expected {n} float(s) in ASCII mode file, got {len(out)}") out.append(float(self._pending.pop(0))) return np.array(out, dtype=np.float64) def skip_line(self) -> None: """Discard pending tokens and advance one physical line.""" self._pending.clear() if self._li < len(self._lines): self._li += 1
[docs] def read_modes_asc( data: str | bytes, *, modes: list[int] | None = None, encoding: str = "utf-8", ) -> ModesAscReadResult: """Parse Kraken **ASCII** mode file text (Matlab ``read_modes_asc.m``). ``modes`` lists **1-based** mode indices to keep (default: all modes in the file). """ if isinstance(data, bytes): text = data.decode(encoding, errors="replace") else: text = data lines = text.splitlines() if len(lines) < 3: raise ValueError("ASCII mode file too short") lrecl = int(float(lines[0].split("!")[0].replace(",", " ").split()[0])) title = lines[1].strip() cur = _FloatLineCursor(lines, 2) head = cur.take(5) freq = float(head[0]) nmedia = int(head[1]) ntot = int(head[2]) nmat = int(head[3]) m_count = int(head[4]) for _ in range(nmedia): cur.skip_line() cur.skip_line() cur.skip_line() cur.skip_line() z = cur.take(ntot).astype(np.float32) cur.skip_line() k_flat = cur.take(2 * m_count) k_full = k_flat[0::2] + 1j * k_flat[1::2] if modes is None: mode_list = list(range(1, m_count + 1)) else: mode_list = list(modes) mode_list = [mm for mm in mode_list if 1 <= mm <= m_count] if not mode_list: k_sel = np.zeros(0, dtype=np.complex128) phi = np.zeros((ntot, 0), dtype=np.complex64) return ModesAscReadResult( lrecl=lrecl, title=title, freq=freq, nmedia=nmedia, ntot=ntot, nmat=nmat, num_modes=m_count, z=z, k=k_sel, phi=phi, ) k_sel = k_full[np.array(mode_list, dtype=np.int64) - 1].astype(np.complex128) phi = np.zeros((ntot, len(mode_list)), dtype=np.complex64) want = set(mode_list) max_m = max(mode_list) for mode_idx in range(1, max_m + 1): cur.skip_line() flat = cur.take(2 * ntot) if mode_idx in want: col = mode_list.index(mode_idx) phi[:, col] = (flat[0::2] + 1j * flat[1::2]).astype(np.complex64) return ModesAscReadResult( lrecl=lrecl, title=title, freq=freq, nmedia=nmedia, ntot=ntot, nmat=nmat, num_modes=m_count, z=z, k=k_sel, phi=phi, )
[docs] def read_modes_asc_bytes( data: bytes, *, modes: list[int] | None = None, encoding: str = "utf-8", ) -> ModesAscReadResult: """Like :func:`read_modes_asc` for encoded bytes.""" return read_modes_asc(data, modes=modes, encoding=encoding)
[docs] def read_modes( data: bytes | str, *, freq: float = 0.0, modes: list[int] | None = None, file_type: Literal["mod", "moa"] | None = None, encoding: str = "utf-8", ) -> ModesReadResult | ModesAscReadResult: """Dispatch mode parsing like Matlab ``read_modes.m`` for in-memory payloads. - ``file_type='mod'``: binary ``read_modes_bin`` - ``file_type='moa'``: ASCII ``read_modes_asc`` - ``file_type=None``: infer from payload type/content (bytes->binary first, str->ASCII). """ if file_type == "mod": if isinstance(data, str): data = data.encode(encoding) return read_modes_bin(data, freq=freq, modes=modes) if file_type == "moa": return read_modes_asc(data, modes=modes, encoding=encoding) if file_type is not None: raise ValueError(f"unsupported modes file_type {file_type!r}; expected 'mod' or 'moa'") if isinstance(data, str): return read_modes_asc(data, modes=modes, encoding=encoding) # bytes: attempt binary first (common path); if it fails, fall back to ASCII decode. try: return read_modes_bin(data, freq=freq, modes=modes) except Exception: return read_modes_asc(data, modes=modes, encoding=encoding)
[docs] def read_modes_bytes( data: bytes, *, freq: float = 0.0, modes: list[int] | None = None, file_type: Literal["mod", "moa"] | None = None, encoding: str = "utf-8", ) -> ModesReadResult | ModesAscReadResult: """Like :func:`read_modes` for ``bytes`` input only.""" return read_modes(data, freq=freq, modes=modes, file_type=file_type, encoding=encoding)