Source code for at_py.readwrite.ram

"""RAM ``tl.grid`` reader (port of Matlab ``read_ram_tlgrid.m``)."""

from __future__ import annotations

import struct
from dataclasses import dataclass

import numpy as np


[docs] @dataclass(frozen=True) class RamTlGridPos: """Vertical source depth and receiver depth/range grids for RAM ``tl.grid`` (meters).""" s_z: float r_z: np.ndarray r_r: np.ndarray
[docs] @dataclass(frozen=True) class RamTlGridResult: """RAM transmission-loss grid: TL (dB) and derived linear amplitude on a 2-D grid.""" plot_title: str plot_type: str freq: float atten: float pos: RamTlGridPos tl_db: np.ndarray # shape (lz, lr); transmission loss in dB pressure_amp: np.ndarray # same shape; 10 ** (-TL / 20)
def _marker_fmt(data: bytes) -> tuple[str, int]: """Return ``(struct format, marker byte width)`` for Fortran sequential markers.""" if len(data) < 8: raise ValueError("tl.grid file too short for record markers") (rlen64,) = struct.unpack_from("<Q", data, 0) if rlen64 == 60: return "<Q", 8 (rlen32,) = struct.unpack_from("<I", data, 0) if rlen32 == 60: return "<I", 4 raise ValueError("tl.grid: could not determine record marker width (expected 60-byte header)") def _read_record_marker(mfmt: str, msize: int, data: bytes, off: int, ctx: str) -> tuple[int, int]: """Read one record-length field; return ``(length, offset_after_marker)``.""" if off + msize > len(data): raise ValueError(f"tl.grid: truncated {ctx}") (rlen,) = struct.unpack_from(mfmt, data, off) return rlen, off + msize
[docs] def read_ram_tlgrid(data: bytes) -> RamTlGridResult: """Parse RAM binary ``tl.grid`` bytes (Matlab ``read_ram_tlgrid``).""" mfmt, msize = _marker_fmt(data) off = 0 rlen0, off = _read_record_marker(mfmt, msize, data, off, "header opening marker") if rlen0 != 60: raise ValueError("tl.grid: unexpected first record length") hdr_fmt = "<fffffiffiffiifi" if off + 60 > len(data): raise ValueError("tl.grid: truncated header record") vals = struct.unpack_from(hdr_fmt, data, off) off += 60 ( freq, zs, _zr, rmax, dr, ndr, _zmax, dz, ndz, _zmplt, _c0, _np, _ns, _rs, lz, ) = vals rlen1, off = _read_record_marker(mfmt, msize, data, off, "header closing marker") if rlen1 != 60: raise ValueError("tl.grid: unexpected closing marker on header record") lr = int(np.floor(float(rmax) / (float(dr) * float(ndr)))) if lr < 0: raise ValueError("tl.grid: negative range count") lz_i = int(lz) if lz_i <= 0: raise ValueError("tl.grid: lz must be positive") tl = np.zeros((lz_i, lr), dtype=np.float32) for j in range(lr): rlen_open, off = _read_record_marker( mfmt, msize, data, off, f"data record {j} opening marker" ) nbytes = lz_i * 4 if rlen_open != nbytes: raise ValueError(f"tl.grid: data record {j} marker {rlen_open} != lz*4 ({nbytes})") if off + nbytes > len(data): raise ValueError("tl.grid: truncated TL column") tl[:, j] = np.frombuffer(data, dtype="<f4", count=lz_i, offset=off) off += nbytes rlen_close, off = _read_record_marker( mfmt, msize, data, off, f"data record {j} closing marker" ) if rlen_close != nbytes: raise ValueError(f"tl.grid: data record {j} closing marker mismatch") r_r = (np.arange(1, lr + 1, dtype=np.float64) * float(ndr) * float(dr)).astype( np.float64, copy=False ) r_z = (np.arange(1, lz_i + 1, dtype=np.float64) * float(ndz) * float(dz)).astype( np.float64, copy=False ) pressure = np.power(10.0, -tl.astype(np.float64) / 20.0).astype(np.float32) return RamTlGridResult( plot_title="RAM", plot_type="rectilin", freq=float(freq), atten=0.0, pos=RamTlGridPos(s_z=float(zs), r_z=r_z, r_r=r_r), tl_db=tl, pressure_amp=pressure, )
[docs] def read_ram_tlgrid_bytes(data: bytes) -> RamTlGridResult: """Alias of :func:`read_ram_tlgrid` (symmetry with other ``*_bytes`` APIs).""" return read_ram_tlgrid(data)