Source code for at_py.readwrite.arrivals

from __future__ import annotations

import struct
from dataclasses import dataclass
from typing import Literal

import numpy as np

from at_py.readwrite.fortran_records import FortranRecordReader, autodetect_marker_len


@dataclass(frozen=True)
class ArrivalsPos2D:
    """2D arrivals geometry: frequency and source/receiver depth and range grids."""

    freq: float
    s_z: np.ndarray  # float32
    r_z: np.ndarray  # float32
    r_r: np.ndarray  # float64


@dataclass(frozen=True)
class ArrivalsPos3D:
    """3D arrivals geometry: frequency, 3D source grid, receiver depth/range/azimuth."""

    freq: float
    s_x: np.ndarray  # float64
    s_y: np.ndarray  # float64
    s_z: np.ndarray  # float32
    r_z: np.ndarray  # float32
    r_r: np.ndarray  # float64
    r_theta: np.ndarray  # float64


@dataclass(frozen=True)
class ArrivalsCell2D:
    """One 2D arrival cell: complex amplitude/delay rays and declination metadata."""

    narr: int
    A: np.ndarray  # complex64, shape (narr,)
    delay: np.ndarray  # complex64, shape (narr,)
    src_decl: np.ndarray  # float32
    rcvr_decl: np.ndarray  # float32
    num_top: np.ndarray  # int16
    num_bot: np.ndarray  # int16


@dataclass(frozen=True)
class ArrivalsCell3D:
    """One 3D arrival cell (adds source/receiver azimuth vs 2D)."""

    narr: int
    A: np.ndarray  # complex64
    delay: np.ndarray  # complex64
    src_decl: np.ndarray  # float32
    src_azim: np.ndarray  # float32
    rcvr_decl: np.ndarray  # float32
    rcvr_azim: np.ndarray  # float32
    num_top: np.ndarray  # int16
    num_bot: np.ndarray  # int16


@dataclass(frozen=True)
class Arrivals2D:
    """Full 2D arrivals file: geometry plus per-cell ray data on a 3-D index grid."""

    pos: ArrivalsPos2D
    arr: np.ndarray  # dtype=object, shape (nrr, nrz, nsz) of ArrivalsCell2D


@dataclass(frozen=True)
class Arrivals3D:
    """Full 3D arrivals file: geometry plus object array of :class:`ArrivalsCell3D`."""

    pos: ArrivalsPos3D
    arr: np.ndarray  # dtype=object, shape (nrr, nrz, nrtheta, nsz) of ArrivalsCell3D


class _AscTok:
    """Whitespace token stream for Bellhop ASCII arrivals (Matlab ``fscanf`` order)."""

    __slots__ = ("_t", "_i")

    def __init__(self, text: str) -> None:
        """Build a whitespace token stream from ASCII arrivals text."""
        self._t = text.split()
        self._i = 0

    def pop_word(self) -> str:
        """Return the next whitespace-delimited token."""
        if self._i >= len(self._t):
            raise ValueError("unexpected end of ASCII arrivals file")
        w = self._t[self._i]
        self._i += 1
        return w

    def pop_int(self) -> int:
        """Pop one token and parse as integer (via float, matching Matlab)."""
        return int(float(self.pop_word()))

    def pop_f32(self) -> np.ndarray:
        """Read count ``N`` then ``N`` float32 values."""
        n = self.pop_int()
        return np.array([float(self.pop_word()) for _ in range(n)], dtype=np.float32)

    def pop_f64(self) -> np.ndarray:
        """Read count ``N`` then ``N`` float64 values."""
        n = self.pop_int()
        return np.array([float(self.pop_word()) for _ in range(n)], dtype=np.float64)


[docs] def read_arrivals_asc(data: str | bytes, *, encoding: str = "utf-8") -> Arrivals2D | Arrivals3D: """Parse Bellhop/Bellhop3D **ASCII** arrivals (port of ``read_arrivals_asc.m``).""" if isinstance(data, bytes): text = data.decode(encoding, errors="replace") else: text = data tok = _AscTok(text) flag = tok.pop_word() if flag == "'2D'": return _read_arrivals_asc_2d(tok) if flag == "'3D'": return _read_arrivals_asc_3d(tok) raise ValueError("not an ASCII format Arrivals file?")
[docs] def read_arrivals_asc_bytes(data: bytes, *, encoding: str = "utf-8") -> Arrivals2D | Arrivals3D: """Like :func:`read_arrivals_asc` for UTF-8 (or other) encoded bytes.""" return read_arrivals_asc(data, encoding=encoding)
def _read_arrivals_asc_2d(tok: _AscTok) -> Arrivals2D: """Parse 2D ASCII arrivals after the ``'2D'`` flag (Matlab ``read_arrivals_asc``).""" freq = float(tok.pop_word()) s_z = tok.pop_f32() r_z = tok.pop_f32() r_r = tok.pop_f64() nsz, nrz, nrr = len(s_z), len(r_z), len(r_r) pos = ArrivalsPos2D( freq=float(freq), s_z=s_z, r_z=r_z, r_r=r_r, ) arr = np.empty((nrr, nrz, nsz), dtype=object) for isd in range(nsz): _ = tok.pop_int() # Narrmx2 for irz in range(nrz): for irr in range(nrr): narr = tok.pop_int() if narr <= 0: arr[irr, irz, isd] = ArrivalsCell2D( narr=0, A=np.zeros(0, dtype=np.complex64), delay=np.zeros(0, dtype=np.complex64), src_decl=np.zeros(0, dtype=np.float32), rcvr_decl=np.zeros(0, dtype=np.float32), num_top=np.zeros(0, dtype=np.int16), num_bot=np.zeros(0, dtype=np.int16), ) continue da = np.empty((8, narr), dtype=np.float32) for j in range(narr): for i in range(8): da[i, j] = float(tok.pop_word()) mags = da[0, :] phase_deg = da[1, :] d_re = da[2, :] d_im = da[3, :] src_decl = da[4, :] rcvr_decl = da[5, :] num_top = da[6, :].astype(np.int16) num_bot = da[7, :].astype(np.int16) A = (mags * np.exp(1j * phase_deg * np.pi / 180.0)).astype(np.complex64) delay = (d_re + 1j * d_im).astype(np.complex64) arr[irr, irz, isd] = ArrivalsCell2D( narr=int(narr), A=A, delay=delay, src_decl=src_decl.astype(np.float32), rcvr_decl=rcvr_decl.astype(np.float32), num_top=num_top, num_bot=num_bot, ) return Arrivals2D(pos=pos, arr=arr) def _read_arrivals_asc_3d(tok: _AscTok) -> Arrivals3D: """Parse 3D ASCII arrivals after the ``'3D'`` flag.""" freq = float(tok.pop_word()) s_x = tok.pop_f64() s_y = tok.pop_f64() s_z = tok.pop_f32() # Matlab uses %f for z grids; binary uses float32 r_z = tok.pop_f32() r_r = tok.pop_f64() r_theta = tok.pop_f64() nsz = len(s_z) nrz, nrr, nrtheta = len(r_z), len(r_r), len(r_theta) pos = ArrivalsPos3D( freq=float(freq), s_x=s_x, s_y=s_y, s_z=s_z, r_z=r_z, r_r=r_r, r_theta=r_theta, ) arr = np.empty((nrr, nrz, nrtheta, nsz), dtype=object) for isd in range(nsz): _ = tok.pop_int() # Narrmx2 for irtheta in range(nrtheta): for irz in range(nrz): for irr in range(nrr): narr = tok.pop_int() if narr <= 0: arr[irr, irz, irtheta, isd] = ArrivalsCell3D( narr=0, A=np.zeros(0, dtype=np.complex64), delay=np.zeros(0, dtype=np.complex64), src_decl=np.zeros(0, dtype=np.float32), src_azim=np.zeros(0, dtype=np.float32), rcvr_decl=np.zeros(0, dtype=np.float32), rcvr_azim=np.zeros(0, dtype=np.float32), num_top=np.zeros(0, dtype=np.int16), num_bot=np.zeros(0, dtype=np.int16), ) continue da = np.empty((10, narr), dtype=np.float32) for j in range(narr): for i in range(10): da[i, j] = float(tok.pop_word()) mags = da[0, :] phase_deg = da[1, :] d_re = da[2, :] d_im = da[3, :] src_decl = da[4, :] src_azim = da[5, :] rcvr_decl = da[6, :] rcvr_azim = da[7, :] num_top = da[8, :].astype(np.int16) num_bot = da[9, :].astype(np.int16) A = (mags * np.exp(1j * phase_deg * np.pi / 180.0)).astype(np.complex64) delay = (d_re + 1j * d_im).astype(np.complex64) arr[irr, irz, irtheta, isd] = ArrivalsCell3D( narr=int(narr), A=A, delay=delay, src_decl=src_decl.astype(np.float32), src_azim=src_azim.astype(np.float32), rcvr_decl=rcvr_decl.astype(np.float32), rcvr_azim=rcvr_azim.astype(np.float32), num_top=num_top, num_bot=num_bot, ) return Arrivals3D(pos=pos, arr=arr) def _read_count_and_array(payload: bytes, count_dtype: str, array_dtype: str): """Read leading int count then ``n`` elements from ``payload`` (binary arrivals blocks).""" (n,) = struct.unpack_from(count_dtype, payload, 0) arr = np.frombuffer( payload, dtype=array_dtype, offset=struct.calcsize(count_dtype), count=int(n), ).copy() return int(n), arr
[docs] def read_arrivals_bin(data: bytes) -> Arrivals2D | Arrivals3D: """Parse Bellhop/Bellhop3D binary arrivals file bytes (port of `read_arrivals_bin.m`).""" marker_len = autodetect_marker_len(data) rr = FortranRecordReader(memoryview(data), marker_len=marker_len, offset=0) flag = rr.read_record() if flag == b"'2D'": freq = struct.unpack_from("<f", rr.read_record(), 0)[0] nsz, s_z = _read_count_and_array(rr.read_record(), "<i", "<f4") nrz, r_z = _read_count_and_array(rr.read_record(), "<i", "<f4") nrr, r_r = _read_count_and_array(rr.read_record(), "<i", "<f8") pos = ArrivalsPos2D( freq=float(freq), s_z=s_z.astype(np.float32), r_z=r_z.astype(np.float32), r_r=r_r, ) arr = np.empty((nrr, nrz, nsz), dtype=object) for isd in range(nsz): _ = struct.unpack_from("<i", rr.read_record(), 0)[0] # Narrmx2 (max arrivals), unused for irz in range(nrz): for irr in range(nrr): narr = struct.unpack_from("<i", rr.read_record(), 0)[0] if narr <= 0: arr[irr, irz, isd] = ArrivalsCell2D( narr=0, A=np.zeros(0, dtype=np.complex64), delay=np.zeros(0, dtype=np.complex64), src_decl=np.zeros(0, dtype=np.float32), rcvr_decl=np.zeros(0, dtype=np.float32), num_top=np.zeros(0, dtype=np.int16), num_bot=np.zeros(0, dtype=np.int16), ) continue mags = np.empty(narr, dtype=np.float32) phase_deg = np.empty(narr, dtype=np.float32) d_re = np.empty(narr, dtype=np.float32) d_im = np.empty(narr, dtype=np.float32) src_decl = np.empty(narr, dtype=np.float32) rcvr_decl = np.empty(narr, dtype=np.float32) num_top = np.empty(narr, dtype=np.int16) num_bot = np.empty(narr, dtype=np.int16) for j in range(narr): p = rr.read_record() if len(p) != 8 * 4: raise ValueError("unexpected 2D arrival payload length") vals = struct.unpack_from("<8f", p, 0) mags[j] = vals[0] phase_deg[j] = vals[1] d_re[j] = vals[2] d_im[j] = vals[3] src_decl[j] = vals[4] rcvr_decl[j] = vals[5] num_top[j] = int(vals[6]) num_bot[j] = int(vals[7]) A = (mags * np.exp(1j * phase_deg * np.pi / 180.0)).astype(np.complex64) delay = (d_re + 1j * d_im).astype(np.complex64) arr[irr, irz, isd] = ArrivalsCell2D( narr=int(narr), A=A, delay=delay, src_decl=src_decl, rcvr_decl=rcvr_decl, num_top=num_top, num_bot=num_bot, ) return Arrivals2D(pos=pos, arr=arr) if flag == b"'3D'": freq = struct.unpack_from("<f", rr.read_record(), 0)[0] nsx, s_x = _read_count_and_array(rr.read_record(), "<i", "<f8") nsy, s_y = _read_count_and_array(rr.read_record(), "<i", "<f8") nsz, s_z = _read_count_and_array(rr.read_record(), "<i", "<f4") nrz, r_z = _read_count_and_array(rr.read_record(), "<i", "<f4") nrr, r_r = _read_count_and_array(rr.read_record(), "<i", "<f8") nrtheta, r_theta = _read_count_and_array(rr.read_record(), "<i", "<f8") pos = ArrivalsPos3D( freq=float(freq), s_x=s_x, s_y=s_y, s_z=s_z.astype(np.float32), r_z=r_z.astype(np.float32), r_r=r_r, r_theta=r_theta, ) arr = np.empty((nrr, nrz, nrtheta, nsz), dtype=object) for isd in range(nsz): _ = struct.unpack_from("<i", rr.read_record(), 0)[0] # Narrmx2, unused for irtheta in range(nrtheta): for irz in range(nrz): for irr in range(nrr): narr = struct.unpack_from("<i", rr.read_record(), 0)[0] if narr <= 0: arr[irr, irz, irtheta, isd] = ArrivalsCell3D( narr=0, A=np.zeros(0, dtype=np.complex64), delay=np.zeros(0, dtype=np.complex64), src_decl=np.zeros(0, dtype=np.float32), src_azim=np.zeros(0, dtype=np.float32), rcvr_decl=np.zeros(0, dtype=np.float32), rcvr_azim=np.zeros(0, dtype=np.float32), num_top=np.zeros(0, dtype=np.int16), num_bot=np.zeros(0, dtype=np.int16), ) continue mags = np.empty(narr, dtype=np.float32) phase_deg = np.empty(narr, dtype=np.float32) d_re = np.empty(narr, dtype=np.float32) d_im = np.empty(narr, dtype=np.float32) src_decl = np.empty(narr, dtype=np.float32) src_azim = np.empty(narr, dtype=np.float32) rcvr_decl = np.empty(narr, dtype=np.float32) rcvr_azim = np.empty(narr, dtype=np.float32) num_top = np.empty(narr, dtype=np.int16) num_bot = np.empty(narr, dtype=np.int16) for j in range(narr): p = rr.read_record() if len(p) != 10 * 4: raise ValueError("unexpected 3D arrival payload length") vals = struct.unpack_from("<10f", p, 0) mags[j] = vals[0] phase_deg[j] = vals[1] d_re[j] = vals[2] d_im[j] = vals[3] src_decl[j] = vals[4] src_azim[j] = vals[5] rcvr_decl[j] = vals[6] rcvr_azim[j] = vals[7] num_top[j] = int(vals[8]) num_bot[j] = int(vals[9]) A = (mags * np.exp(1j * phase_deg * np.pi / 180.0)).astype(np.complex64) delay = (d_re + 1j * d_im).astype(np.complex64) arr[irr, irz, irtheta, isd] = ArrivalsCell3D( narr=int(narr), A=A, delay=delay, src_decl=src_decl, src_azim=src_azim, rcvr_decl=rcvr_decl, rcvr_azim=rcvr_azim, num_top=num_top, num_bot=num_bot, ) return Arrivals3D(pos=pos, arr=arr) raise ValueError("not a BINARY format Arrivals file?")
[docs] def read_arrivals( data: bytes | str, *, file_type: Literal["bin", "asc"] | None = None, encoding: str = "utf-8", ) -> Arrivals2D | Arrivals3D: """Dispatch arrivals parsing (binary vs ASCII) for in-memory payloads.""" if file_type == "bin": if isinstance(data, str): data = data.encode(encoding) return read_arrivals_bin(data) if file_type == "asc": return read_arrivals_asc(data, encoding=encoding) if file_type is not None: raise ValueError(f"unsupported arrivals file_type {file_type!r}; expected 'bin' or 'asc'") if isinstance(data, str): return read_arrivals_asc(data, encoding=encoding) try: return read_arrivals_bin(data) except ValueError: return read_arrivals_asc(data, encoding=encoding)
[docs] def read_arrivals_bytes( data: bytes, *, file_type: Literal["bin", "asc"] | None = None, encoding: str = "utf-8", ) -> Arrivals2D | Arrivals3D: """Like :func:`read_arrivals` for ``bytes`` input only.""" return read_arrivals(data, file_type=file_type, encoding=encoding)