from __future__ import annotations
import struct
from dataclasses import dataclass
from typing import Literal
import numpy as np
from at_py.readwrite.fortran_records import FortranRecordReader, autodetect_marker_len
@dataclass(frozen=True)
class ArrivalsPos2D:
"""2D arrivals geometry: frequency and source/receiver depth and range grids."""
freq: float
s_z: np.ndarray # float32
r_z: np.ndarray # float32
r_r: np.ndarray # float64
@dataclass(frozen=True)
class ArrivalsPos3D:
"""3D arrivals geometry: frequency, 3D source grid, receiver depth/range/azimuth."""
freq: float
s_x: np.ndarray # float64
s_y: np.ndarray # float64
s_z: np.ndarray # float32
r_z: np.ndarray # float32
r_r: np.ndarray # float64
r_theta: np.ndarray # float64
@dataclass(frozen=True)
class ArrivalsCell2D:
"""One 2D arrival cell: complex amplitude/delay rays and declination metadata."""
narr: int
A: np.ndarray # complex64, shape (narr,)
delay: np.ndarray # complex64, shape (narr,)
src_decl: np.ndarray # float32
rcvr_decl: np.ndarray # float32
num_top: np.ndarray # int16
num_bot: np.ndarray # int16
@dataclass(frozen=True)
class ArrivalsCell3D:
"""One 3D arrival cell (adds source/receiver azimuth vs 2D)."""
narr: int
A: np.ndarray # complex64
delay: np.ndarray # complex64
src_decl: np.ndarray # float32
src_azim: np.ndarray # float32
rcvr_decl: np.ndarray # float32
rcvr_azim: np.ndarray # float32
num_top: np.ndarray # int16
num_bot: np.ndarray # int16
@dataclass(frozen=True)
class Arrivals2D:
"""Full 2D arrivals file: geometry plus per-cell ray data on a 3-D index grid."""
pos: ArrivalsPos2D
arr: np.ndarray # dtype=object, shape (nrr, nrz, nsz) of ArrivalsCell2D
@dataclass(frozen=True)
class Arrivals3D:
"""Full 3D arrivals file: geometry plus object array of :class:`ArrivalsCell3D`."""
pos: ArrivalsPos3D
arr: np.ndarray # dtype=object, shape (nrr, nrz, nrtheta, nsz) of ArrivalsCell3D
class _AscTok:
"""Whitespace token stream for Bellhop ASCII arrivals (Matlab ``fscanf`` order)."""
__slots__ = ("_t", "_i")
def __init__(self, text: str) -> None:
"""Build a whitespace token stream from ASCII arrivals text."""
self._t = text.split()
self._i = 0
def pop_word(self) -> str:
"""Return the next whitespace-delimited token."""
if self._i >= len(self._t):
raise ValueError("unexpected end of ASCII arrivals file")
w = self._t[self._i]
self._i += 1
return w
def pop_int(self) -> int:
"""Pop one token and parse as integer (via float, matching Matlab)."""
return int(float(self.pop_word()))
def pop_f32(self) -> np.ndarray:
"""Read count ``N`` then ``N`` float32 values."""
n = self.pop_int()
return np.array([float(self.pop_word()) for _ in range(n)], dtype=np.float32)
def pop_f64(self) -> np.ndarray:
"""Read count ``N`` then ``N`` float64 values."""
n = self.pop_int()
return np.array([float(self.pop_word()) for _ in range(n)], dtype=np.float64)
[docs]
def read_arrivals_asc(data: str | bytes, *, encoding: str = "utf-8") -> Arrivals2D | Arrivals3D:
"""Parse Bellhop/Bellhop3D **ASCII** arrivals (port of ``read_arrivals_asc.m``)."""
if isinstance(data, bytes):
text = data.decode(encoding, errors="replace")
else:
text = data
tok = _AscTok(text)
flag = tok.pop_word()
if flag == "'2D'":
return _read_arrivals_asc_2d(tok)
if flag == "'3D'":
return _read_arrivals_asc_3d(tok)
raise ValueError("not an ASCII format Arrivals file?")
[docs]
def read_arrivals_asc_bytes(data: bytes, *, encoding: str = "utf-8") -> Arrivals2D | Arrivals3D:
"""Like :func:`read_arrivals_asc` for UTF-8 (or other) encoded bytes."""
return read_arrivals_asc(data, encoding=encoding)
def _read_arrivals_asc_2d(tok: _AscTok) -> Arrivals2D:
"""Parse 2D ASCII arrivals after the ``'2D'`` flag (Matlab ``read_arrivals_asc``)."""
freq = float(tok.pop_word())
s_z = tok.pop_f32()
r_z = tok.pop_f32()
r_r = tok.pop_f64()
nsz, nrz, nrr = len(s_z), len(r_z), len(r_r)
pos = ArrivalsPos2D(
freq=float(freq),
s_z=s_z,
r_z=r_z,
r_r=r_r,
)
arr = np.empty((nrr, nrz, nsz), dtype=object)
for isd in range(nsz):
_ = tok.pop_int() # Narrmx2
for irz in range(nrz):
for irr in range(nrr):
narr = tok.pop_int()
if narr <= 0:
arr[irr, irz, isd] = ArrivalsCell2D(
narr=0,
A=np.zeros(0, dtype=np.complex64),
delay=np.zeros(0, dtype=np.complex64),
src_decl=np.zeros(0, dtype=np.float32),
rcvr_decl=np.zeros(0, dtype=np.float32),
num_top=np.zeros(0, dtype=np.int16),
num_bot=np.zeros(0, dtype=np.int16),
)
continue
da = np.empty((8, narr), dtype=np.float32)
for j in range(narr):
for i in range(8):
da[i, j] = float(tok.pop_word())
mags = da[0, :]
phase_deg = da[1, :]
d_re = da[2, :]
d_im = da[3, :]
src_decl = da[4, :]
rcvr_decl = da[5, :]
num_top = da[6, :].astype(np.int16)
num_bot = da[7, :].astype(np.int16)
A = (mags * np.exp(1j * phase_deg * np.pi / 180.0)).astype(np.complex64)
delay = (d_re + 1j * d_im).astype(np.complex64)
arr[irr, irz, isd] = ArrivalsCell2D(
narr=int(narr),
A=A,
delay=delay,
src_decl=src_decl.astype(np.float32),
rcvr_decl=rcvr_decl.astype(np.float32),
num_top=num_top,
num_bot=num_bot,
)
return Arrivals2D(pos=pos, arr=arr)
def _read_arrivals_asc_3d(tok: _AscTok) -> Arrivals3D:
"""Parse 3D ASCII arrivals after the ``'3D'`` flag."""
freq = float(tok.pop_word())
s_x = tok.pop_f64()
s_y = tok.pop_f64()
s_z = tok.pop_f32() # Matlab uses %f for z grids; binary uses float32
r_z = tok.pop_f32()
r_r = tok.pop_f64()
r_theta = tok.pop_f64()
nsz = len(s_z)
nrz, nrr, nrtheta = len(r_z), len(r_r), len(r_theta)
pos = ArrivalsPos3D(
freq=float(freq),
s_x=s_x,
s_y=s_y,
s_z=s_z,
r_z=r_z,
r_r=r_r,
r_theta=r_theta,
)
arr = np.empty((nrr, nrz, nrtheta, nsz), dtype=object)
for isd in range(nsz):
_ = tok.pop_int() # Narrmx2
for irtheta in range(nrtheta):
for irz in range(nrz):
for irr in range(nrr):
narr = tok.pop_int()
if narr <= 0:
arr[irr, irz, irtheta, isd] = ArrivalsCell3D(
narr=0,
A=np.zeros(0, dtype=np.complex64),
delay=np.zeros(0, dtype=np.complex64),
src_decl=np.zeros(0, dtype=np.float32),
src_azim=np.zeros(0, dtype=np.float32),
rcvr_decl=np.zeros(0, dtype=np.float32),
rcvr_azim=np.zeros(0, dtype=np.float32),
num_top=np.zeros(0, dtype=np.int16),
num_bot=np.zeros(0, dtype=np.int16),
)
continue
da = np.empty((10, narr), dtype=np.float32)
for j in range(narr):
for i in range(10):
da[i, j] = float(tok.pop_word())
mags = da[0, :]
phase_deg = da[1, :]
d_re = da[2, :]
d_im = da[3, :]
src_decl = da[4, :]
src_azim = da[5, :]
rcvr_decl = da[6, :]
rcvr_azim = da[7, :]
num_top = da[8, :].astype(np.int16)
num_bot = da[9, :].astype(np.int16)
A = (mags * np.exp(1j * phase_deg * np.pi / 180.0)).astype(np.complex64)
delay = (d_re + 1j * d_im).astype(np.complex64)
arr[irr, irz, irtheta, isd] = ArrivalsCell3D(
narr=int(narr),
A=A,
delay=delay,
src_decl=src_decl.astype(np.float32),
src_azim=src_azim.astype(np.float32),
rcvr_decl=rcvr_decl.astype(np.float32),
rcvr_azim=rcvr_azim.astype(np.float32),
num_top=num_top,
num_bot=num_bot,
)
return Arrivals3D(pos=pos, arr=arr)
def _read_count_and_array(payload: bytes, count_dtype: str, array_dtype: str):
"""Read leading int count then ``n`` elements from ``payload`` (binary arrivals blocks)."""
(n,) = struct.unpack_from(count_dtype, payload, 0)
arr = np.frombuffer(
payload,
dtype=array_dtype,
offset=struct.calcsize(count_dtype),
count=int(n),
).copy()
return int(n), arr
[docs]
def read_arrivals_bin(data: bytes) -> Arrivals2D | Arrivals3D:
"""Parse Bellhop/Bellhop3D binary arrivals file bytes (port of `read_arrivals_bin.m`)."""
marker_len = autodetect_marker_len(data)
rr = FortranRecordReader(memoryview(data), marker_len=marker_len, offset=0)
flag = rr.read_record()
if flag == b"'2D'":
freq = struct.unpack_from("<f", rr.read_record(), 0)[0]
nsz, s_z = _read_count_and_array(rr.read_record(), "<i", "<f4")
nrz, r_z = _read_count_and_array(rr.read_record(), "<i", "<f4")
nrr, r_r = _read_count_and_array(rr.read_record(), "<i", "<f8")
pos = ArrivalsPos2D(
freq=float(freq),
s_z=s_z.astype(np.float32),
r_z=r_z.astype(np.float32),
r_r=r_r,
)
arr = np.empty((nrr, nrz, nsz), dtype=object)
for isd in range(nsz):
_ = struct.unpack_from("<i", rr.read_record(), 0)[0] # Narrmx2 (max arrivals), unused
for irz in range(nrz):
for irr in range(nrr):
narr = struct.unpack_from("<i", rr.read_record(), 0)[0]
if narr <= 0:
arr[irr, irz, isd] = ArrivalsCell2D(
narr=0,
A=np.zeros(0, dtype=np.complex64),
delay=np.zeros(0, dtype=np.complex64),
src_decl=np.zeros(0, dtype=np.float32),
rcvr_decl=np.zeros(0, dtype=np.float32),
num_top=np.zeros(0, dtype=np.int16),
num_bot=np.zeros(0, dtype=np.int16),
)
continue
mags = np.empty(narr, dtype=np.float32)
phase_deg = np.empty(narr, dtype=np.float32)
d_re = np.empty(narr, dtype=np.float32)
d_im = np.empty(narr, dtype=np.float32)
src_decl = np.empty(narr, dtype=np.float32)
rcvr_decl = np.empty(narr, dtype=np.float32)
num_top = np.empty(narr, dtype=np.int16)
num_bot = np.empty(narr, dtype=np.int16)
for j in range(narr):
p = rr.read_record()
if len(p) != 8 * 4:
raise ValueError("unexpected 2D arrival payload length")
vals = struct.unpack_from("<8f", p, 0)
mags[j] = vals[0]
phase_deg[j] = vals[1]
d_re[j] = vals[2]
d_im[j] = vals[3]
src_decl[j] = vals[4]
rcvr_decl[j] = vals[5]
num_top[j] = int(vals[6])
num_bot[j] = int(vals[7])
A = (mags * np.exp(1j * phase_deg * np.pi / 180.0)).astype(np.complex64)
delay = (d_re + 1j * d_im).astype(np.complex64)
arr[irr, irz, isd] = ArrivalsCell2D(
narr=int(narr),
A=A,
delay=delay,
src_decl=src_decl,
rcvr_decl=rcvr_decl,
num_top=num_top,
num_bot=num_bot,
)
return Arrivals2D(pos=pos, arr=arr)
if flag == b"'3D'":
freq = struct.unpack_from("<f", rr.read_record(), 0)[0]
nsx, s_x = _read_count_and_array(rr.read_record(), "<i", "<f8")
nsy, s_y = _read_count_and_array(rr.read_record(), "<i", "<f8")
nsz, s_z = _read_count_and_array(rr.read_record(), "<i", "<f4")
nrz, r_z = _read_count_and_array(rr.read_record(), "<i", "<f4")
nrr, r_r = _read_count_and_array(rr.read_record(), "<i", "<f8")
nrtheta, r_theta = _read_count_and_array(rr.read_record(), "<i", "<f8")
pos = ArrivalsPos3D(
freq=float(freq),
s_x=s_x,
s_y=s_y,
s_z=s_z.astype(np.float32),
r_z=r_z.astype(np.float32),
r_r=r_r,
r_theta=r_theta,
)
arr = np.empty((nrr, nrz, nrtheta, nsz), dtype=object)
for isd in range(nsz):
_ = struct.unpack_from("<i", rr.read_record(), 0)[0] # Narrmx2, unused
for irtheta in range(nrtheta):
for irz in range(nrz):
for irr in range(nrr):
narr = struct.unpack_from("<i", rr.read_record(), 0)[0]
if narr <= 0:
arr[irr, irz, irtheta, isd] = ArrivalsCell3D(
narr=0,
A=np.zeros(0, dtype=np.complex64),
delay=np.zeros(0, dtype=np.complex64),
src_decl=np.zeros(0, dtype=np.float32),
src_azim=np.zeros(0, dtype=np.float32),
rcvr_decl=np.zeros(0, dtype=np.float32),
rcvr_azim=np.zeros(0, dtype=np.float32),
num_top=np.zeros(0, dtype=np.int16),
num_bot=np.zeros(0, dtype=np.int16),
)
continue
mags = np.empty(narr, dtype=np.float32)
phase_deg = np.empty(narr, dtype=np.float32)
d_re = np.empty(narr, dtype=np.float32)
d_im = np.empty(narr, dtype=np.float32)
src_decl = np.empty(narr, dtype=np.float32)
src_azim = np.empty(narr, dtype=np.float32)
rcvr_decl = np.empty(narr, dtype=np.float32)
rcvr_azim = np.empty(narr, dtype=np.float32)
num_top = np.empty(narr, dtype=np.int16)
num_bot = np.empty(narr, dtype=np.int16)
for j in range(narr):
p = rr.read_record()
if len(p) != 10 * 4:
raise ValueError("unexpected 3D arrival payload length")
vals = struct.unpack_from("<10f", p, 0)
mags[j] = vals[0]
phase_deg[j] = vals[1]
d_re[j] = vals[2]
d_im[j] = vals[3]
src_decl[j] = vals[4]
src_azim[j] = vals[5]
rcvr_decl[j] = vals[6]
rcvr_azim[j] = vals[7]
num_top[j] = int(vals[8])
num_bot[j] = int(vals[9])
A = (mags * np.exp(1j * phase_deg * np.pi / 180.0)).astype(np.complex64)
delay = (d_re + 1j * d_im).astype(np.complex64)
arr[irr, irz, irtheta, isd] = ArrivalsCell3D(
narr=int(narr),
A=A,
delay=delay,
src_decl=src_decl,
src_azim=src_azim,
rcvr_decl=rcvr_decl,
rcvr_azim=rcvr_azim,
num_top=num_top,
num_bot=num_bot,
)
return Arrivals3D(pos=pos, arr=arr)
raise ValueError("not a BINARY format Arrivals file?")
[docs]
def read_arrivals(
data: bytes | str,
*,
file_type: Literal["bin", "asc"] | None = None,
encoding: str = "utf-8",
) -> Arrivals2D | Arrivals3D:
"""Dispatch arrivals parsing (binary vs ASCII) for in-memory payloads."""
if file_type == "bin":
if isinstance(data, str):
data = data.encode(encoding)
return read_arrivals_bin(data)
if file_type == "asc":
return read_arrivals_asc(data, encoding=encoding)
if file_type is not None:
raise ValueError(f"unsupported arrivals file_type {file_type!r}; expected 'bin' or 'asc'")
if isinstance(data, str):
return read_arrivals_asc(data, encoding=encoding)
try:
return read_arrivals_bin(data)
except ValueError:
return read_arrivals_asc(data, encoding=encoding)
[docs]
def read_arrivals_bytes(
data: bytes,
*,
file_type: Literal["bin", "asc"] | None = None,
encoding: str = "utf-8",
) -> Arrivals2D | Arrivals3D:
"""Like :func:`read_arrivals` for ``bytes`` input only."""
return read_arrivals(data, file_type=file_type, encoding=encoding)