"""Kraken mode file readers: binary (``read_modes_bin.m``) and ASCII (``read_modes_asc.m``)."""
from __future__ import annotations
import struct
from dataclasses import dataclass
from typing import Literal
import numpy as np
[docs]
@dataclass
class ModeHalfspace:
"""Top/bottom acoustic half-space coefficients from binary ``.mod`` (Matlab ``Top``/``Bot``)."""
bc: str
cp: np.complex128
cs: np.complex128
rho: np.float32
depth: np.float32
[docs]
@dataclass
class ModesAscReadResult:
"""Parsed ASCII ``.mod`` (Matlab ``read_modes_asc``)."""
lrecl: int
title: str
freq: float
nmedia: int
ntot: int
nmat: int
num_modes: int
z: np.ndarray # float32, shape (ntot,)
k: np.ndarray # complex128, shape (n_read,) — selected modes only
phi: np.ndarray # complex64, shape (ntot, n_read)
[docs]
@dataclass
class ModesReadResult:
"""Parsed ``.mod`` contents for one frequency block."""
title: str
nfreq: int
nmedia: int
ntot: int
nmat: int
n_per_medium: np.ndarray # int32, shape (nmedia,)
mater: list[bytes] # 8-byte labels per medium
depth: np.ndarray # float32, interfaces / medium tops
rho: np.ndarray # float32, density per medium
freq_vec: np.ndarray # float64
z: np.ndarray # float32, depth grid for modes
num_modes: int # number of modes at selected frequency (Matlab ``Modes.M``)
top: ModeHalfspace
bot: ModeHalfspace
phi: np.ndarray # complex64, shape (nmat, n_read)
k: np.ndarray # complex128, shape (n_read,) — selected modes only
def _rec0_offset(lrecl: int) -> int:
"""Byte offset after the leading int32 record-length word."""
return 4 # skip leading record-length int (same convention as Matlab)
def _rec_n_offset(rec: int, lrecl: int) -> int:
"""Byte offset of Fortran direct-access record ``rec`` (``rec==0`` skips length word)."""
if rec == 0:
return _rec0_offset(lrecl)
return rec * lrecl
[docs]
def read_modes_bin(
data: bytes,
freq: float,
*,
modes: list[int] | None = None,
) -> ModesReadResult:
"""Parse Kraken binary mode file bytes (port of Matlab ``read_modes_bin``).
``modes`` uses **1-based** mode indices, matching Matlab (e.g. ``[1, 2, 3]``).
If omitted, all modes at the selected frequency are returned.
Use ``freq=0`` when the file contains a single frequency (Matlab convention).
"""
if len(data) < 4:
raise ValueError("mode file too short")
(recl_words,) = struct.unpack_from("<i", data, 0)
lrecl = 4 * int(recl_words)
if lrecl <= 0 or lrecl % 4 != 0:
raise ValueError(f"invalid record length: {lrecl}")
off = _rec_n_offset(0, lrecl)
title_bytes = data[off : off + 80]
off += 80
title = title_bytes.decode("utf-8", errors="replace").rstrip("\x00").rstrip()
nfreq, nmedia, ntot, nmat = struct.unpack_from("<4i", data, off)
off += 16
if ntot < 0:
raise ValueError("mode file indicates Ntot < 0 (unsupported / empty profile)")
i_rec_profile = 1
off = i_rec_profile * lrecl
n_per_medium = np.empty(nmedia, dtype=np.int32)
mater: list[bytes] = []
for _m in range(nmedia):
(n_per_medium[_m],) = struct.unpack_from("<i", data, off)
off += 4
mater.append(data[off : off + 8])
off += 8
off = (i_rec_profile + 1) * lrecl
bulk = np.frombuffer(data, dtype="<f4", count=2 * nmedia, offset=off).reshape(2, nmedia)
depth = bulk[0, :].astype(np.float32, copy=True)
rho_med = bulk[1, :].astype(np.float32, copy=True)
off = (i_rec_profile + 2) * lrecl
freq_vec = np.frombuffer(data, dtype="<f8", count=nfreq, offset=off).copy()
off = (i_rec_profile + 3) * lrecl
z = np.frombuffer(data, dtype="<f4", count=ntot, offset=off).astype(np.float32, copy=True)
i_rec_profile += 4
rec = i_rec_profile
freq_index = int(np.argmin(np.abs(freq_vec - freq)))
m_count = 0
for ifreq in range(freq_index + 1):
off_m = rec * lrecl
(m_count,) = struct.unpack_from("<i", data, off_m)
if ifreq < freq_index:
# Match Matlab: floor(4*(2*M-1)/lrecl)
step = 3 + m_count + (4 * (2 * m_count - 1)) // lrecl
i_rec_profile += step
rec = i_rec_profile
if modes is None:
mode_list = list(range(1, m_count + 1))
else:
mode_list = list(modes)
mode_list = [mm for mm in mode_list if 1 <= mm <= m_count]
off_hs = (i_rec_profile + 1) * lrecl
o = off_hs
top_bc = data[o : o + 1].decode("latin-1", errors="replace")
o += 1
cp_r, cp_i = struct.unpack_from("<2f", data, o)
o += 8
cs_r, cs_i = struct.unpack_from("<2f", data, o)
o += 8
top_rho, top_depth = struct.unpack_from("<2f", data, o)
o += 8
bot_bc = data[o : o + 1].decode("latin-1", errors="replace")
o += 1
bcp_r, bcp_i = struct.unpack_from("<2f", data, o)
o += 8
bcs_r, bcs_i = struct.unpack_from("<2f", data, o)
o += 8
bot_rho, bot_depth = struct.unpack_from("<2f", data, o)
top = ModeHalfspace(
bc=top_bc,
cp=np.complex128(complex(cp_r, cp_i)),
cs=np.complex128(complex(cs_r, cs_i)),
rho=np.float32(top_rho),
depth=np.float32(top_depth),
)
bot = ModeHalfspace(
bc=bot_bc,
cp=np.complex128(complex(bcp_r, bcp_i)),
cs=np.complex128(complex(bcs_r, bcs_i)),
rho=np.float32(bot_rho),
depth=np.float32(bot_depth),
)
if m_count == 0:
phi = np.zeros((nmat, 0), dtype=np.complex64)
k_sel = np.zeros(0, dtype=np.complex128)
else:
phi = np.zeros((nmat, len(mode_list)), dtype=np.complex64)
for col, mode_num in enumerate(mode_list):
off_phi = (i_rec_profile + 1 + mode_num) * lrecl
raw = np.frombuffer(data, dtype="<f4", count=2 * nmat, offset=off_phi)
real = raw[0::2]
imag = raw[1::2]
phi[:, col] = (real + 1j * imag).astype(np.complex64)
off_k = (i_rec_profile + 2 + m_count) * lrecl
k_all = np.frombuffer(data, dtype="<f4", count=2 * m_count, offset=off_k)
k_re = k_all[0::2]
k_im = k_all[1::2]
k_full = (k_re + 1j * k_im).astype(np.complex128)
idx0 = [mm - 1 for mm in mode_list]
k_sel = k_full[idx0]
return ModesReadResult(
title=title,
nfreq=nfreq,
nmedia=nmedia,
ntot=ntot,
nmat=nmat,
n_per_medium=n_per_medium,
mater=mater,
depth=depth,
rho=rho_med,
freq_vec=freq_vec,
z=z,
num_modes=m_count,
top=top,
bot=bot,
phi=phi,
k=k_sel,
)
class _FloatLineCursor:
"""Sequential floats with ``fgetl``-style line skips (Matlab ``fscanf`` + ``fgetl``)."""
__slots__ = ("_lines", "_li", "_pending")
def __init__(self, lines: list[str], start_li: int) -> None:
"""Parse floats from ``lines`` starting at line ``start_li``."""
self._lines = lines
self._li = start_li
self._pending: list[str] = []
def _refill(self) -> None:
"""Pull tokens from subsequent lines into ``_pending`` (strip ``!`` comments)."""
while self._li < len(self._lines) and not self._pending:
part = self._lines[self._li].split("!", 1)[0]
for t in part.replace(",", " ").split():
t = t.strip()
if t:
self._pending.append(t)
self._li += 1
def take(self, n: int) -> np.ndarray:
"""Pop ``n`` float tokens (may span lines)."""
out: list[float] = []
while len(out) < n:
self._refill()
if not self._pending:
raise ValueError(f"expected {n} float(s) in ASCII mode file, got {len(out)}")
out.append(float(self._pending.pop(0)))
return np.array(out, dtype=np.float64)
def skip_line(self) -> None:
"""Discard pending tokens and advance one physical line."""
self._pending.clear()
if self._li < len(self._lines):
self._li += 1
[docs]
def read_modes_asc(
data: str | bytes,
*,
modes: list[int] | None = None,
encoding: str = "utf-8",
) -> ModesAscReadResult:
"""Parse Kraken **ASCII** mode file text (Matlab ``read_modes_asc.m``).
``modes`` lists **1-based** mode indices to keep (default: all modes in the file).
"""
if isinstance(data, bytes):
text = data.decode(encoding, errors="replace")
else:
text = data
lines = text.splitlines()
if len(lines) < 3:
raise ValueError("ASCII mode file too short")
lrecl = int(float(lines[0].split("!")[0].replace(",", " ").split()[0]))
title = lines[1].strip()
cur = _FloatLineCursor(lines, 2)
head = cur.take(5)
freq = float(head[0])
nmedia = int(head[1])
ntot = int(head[2])
nmat = int(head[3])
m_count = int(head[4])
for _ in range(nmedia):
cur.skip_line()
cur.skip_line()
cur.skip_line()
cur.skip_line()
z = cur.take(ntot).astype(np.float32)
cur.skip_line()
k_flat = cur.take(2 * m_count)
k_full = k_flat[0::2] + 1j * k_flat[1::2]
if modes is None:
mode_list = list(range(1, m_count + 1))
else:
mode_list = list(modes)
mode_list = [mm for mm in mode_list if 1 <= mm <= m_count]
if not mode_list:
k_sel = np.zeros(0, dtype=np.complex128)
phi = np.zeros((ntot, 0), dtype=np.complex64)
return ModesAscReadResult(
lrecl=lrecl,
title=title,
freq=freq,
nmedia=nmedia,
ntot=ntot,
nmat=nmat,
num_modes=m_count,
z=z,
k=k_sel,
phi=phi,
)
k_sel = k_full[np.array(mode_list, dtype=np.int64) - 1].astype(np.complex128)
phi = np.zeros((ntot, len(mode_list)), dtype=np.complex64)
want = set(mode_list)
max_m = max(mode_list)
for mode_idx in range(1, max_m + 1):
cur.skip_line()
flat = cur.take(2 * ntot)
if mode_idx in want:
col = mode_list.index(mode_idx)
phi[:, col] = (flat[0::2] + 1j * flat[1::2]).astype(np.complex64)
return ModesAscReadResult(
lrecl=lrecl,
title=title,
freq=freq,
nmedia=nmedia,
ntot=ntot,
nmat=nmat,
num_modes=m_count,
z=z,
k=k_sel,
phi=phi,
)
[docs]
def read_modes_asc_bytes(
data: bytes,
*,
modes: list[int] | None = None,
encoding: str = "utf-8",
) -> ModesAscReadResult:
"""Like :func:`read_modes_asc` for encoded bytes."""
return read_modes_asc(data, modes=modes, encoding=encoding)
[docs]
def read_modes(
data: bytes | str,
*,
freq: float = 0.0,
modes: list[int] | None = None,
file_type: Literal["mod", "moa"] | None = None,
encoding: str = "utf-8",
) -> ModesReadResult | ModesAscReadResult:
"""Dispatch mode parsing like Matlab ``read_modes.m`` for in-memory payloads.
- ``file_type='mod'``: binary ``read_modes_bin``
- ``file_type='moa'``: ASCII ``read_modes_asc``
- ``file_type=None``: infer from payload type/content (bytes->binary first, str->ASCII).
"""
if file_type == "mod":
if isinstance(data, str):
data = data.encode(encoding)
return read_modes_bin(data, freq=freq, modes=modes)
if file_type == "moa":
return read_modes_asc(data, modes=modes, encoding=encoding)
if file_type is not None:
raise ValueError(f"unsupported modes file_type {file_type!r}; expected 'mod' or 'moa'")
if isinstance(data, str):
return read_modes_asc(data, modes=modes, encoding=encoding)
# bytes: attempt binary first (common path); if it fails, fall back to ASCII decode.
try:
return read_modes_bin(data, freq=freq, modes=modes)
except Exception:
return read_modes_asc(data, modes=modes, encoding=encoding)
[docs]
def read_modes_bytes(
data: bytes,
*,
freq: float = 0.0,
modes: list[int] | None = None,
file_type: Literal["mod", "moa"] | None = None,
encoding: str = "utf-8",
) -> ModesReadResult | ModesAscReadResult:
"""Like :func:`read_modes` for ``bytes`` input only."""
return read_modes(data, freq=freq, modes=modes, file_type=file_type, encoding=encoding)