"""RAM ``tl.grid`` reader (port of Matlab ``read_ram_tlgrid.m``)."""
from __future__ import annotations
import struct
from dataclasses import dataclass
import numpy as np
[docs]
@dataclass(frozen=True)
class RamTlGridPos:
"""Vertical source depth and receiver depth/range grids for RAM ``tl.grid`` (meters)."""
s_z: float
r_z: np.ndarray
r_r: np.ndarray
[docs]
@dataclass(frozen=True)
class RamTlGridResult:
"""RAM transmission-loss grid: TL (dB) and derived linear amplitude on a 2-D grid."""
plot_title: str
plot_type: str
freq: float
atten: float
pos: RamTlGridPos
tl_db: np.ndarray # shape (lz, lr); transmission loss in dB
pressure_amp: np.ndarray # same shape; 10 ** (-TL / 20)
def _marker_fmt(data: bytes) -> tuple[str, int]:
"""Return ``(struct format, marker byte width)`` for Fortran sequential markers."""
if len(data) < 8:
raise ValueError("tl.grid file too short for record markers")
(rlen64,) = struct.unpack_from("<Q", data, 0)
if rlen64 == 60:
return "<Q", 8
(rlen32,) = struct.unpack_from("<I", data, 0)
if rlen32 == 60:
return "<I", 4
raise ValueError("tl.grid: could not determine record marker width (expected 60-byte header)")
def _read_record_marker(mfmt: str, msize: int, data: bytes, off: int, ctx: str) -> tuple[int, int]:
"""Read one record-length field; return ``(length, offset_after_marker)``."""
if off + msize > len(data):
raise ValueError(f"tl.grid: truncated {ctx}")
(rlen,) = struct.unpack_from(mfmt, data, off)
return rlen, off + msize
[docs]
def read_ram_tlgrid(data: bytes) -> RamTlGridResult:
"""Parse RAM binary ``tl.grid`` bytes (Matlab ``read_ram_tlgrid``)."""
mfmt, msize = _marker_fmt(data)
off = 0
rlen0, off = _read_record_marker(mfmt, msize, data, off, "header opening marker")
if rlen0 != 60:
raise ValueError("tl.grid: unexpected first record length")
hdr_fmt = "<fffffiffiffiifi"
if off + 60 > len(data):
raise ValueError("tl.grid: truncated header record")
vals = struct.unpack_from(hdr_fmt, data, off)
off += 60
(
freq,
zs,
_zr,
rmax,
dr,
ndr,
_zmax,
dz,
ndz,
_zmplt,
_c0,
_np,
_ns,
_rs,
lz,
) = vals
rlen1, off = _read_record_marker(mfmt, msize, data, off, "header closing marker")
if rlen1 != 60:
raise ValueError("tl.grid: unexpected closing marker on header record")
lr = int(np.floor(float(rmax) / (float(dr) * float(ndr))))
if lr < 0:
raise ValueError("tl.grid: negative range count")
lz_i = int(lz)
if lz_i <= 0:
raise ValueError("tl.grid: lz must be positive")
tl = np.zeros((lz_i, lr), dtype=np.float32)
for j in range(lr):
rlen_open, off = _read_record_marker(
mfmt, msize, data, off, f"data record {j} opening marker"
)
nbytes = lz_i * 4
if rlen_open != nbytes:
raise ValueError(f"tl.grid: data record {j} marker {rlen_open} != lz*4 ({nbytes})")
if off + nbytes > len(data):
raise ValueError("tl.grid: truncated TL column")
tl[:, j] = np.frombuffer(data, dtype="<f4", count=lz_i, offset=off)
off += nbytes
rlen_close, off = _read_record_marker(
mfmt, msize, data, off, f"data record {j} closing marker"
)
if rlen_close != nbytes:
raise ValueError(f"tl.grid: data record {j} closing marker mismatch")
r_r = (np.arange(1, lr + 1, dtype=np.float64) * float(ndr) * float(dr)).astype(
np.float64, copy=False
)
r_z = (np.arange(1, lz_i + 1, dtype=np.float64) * float(ndz) * float(dz)).astype(
np.float64, copy=False
)
pressure = np.power(10.0, -tl.astype(np.float64) / 20.0).astype(np.float32)
return RamTlGridResult(
plot_title="RAM",
plot_type="rectilin",
freq=float(freq),
atten=0.0,
pos=RamTlGridPos(s_z=float(zs), r_z=r_z, r_r=r_r),
tl_db=tl,
pressure_amp=pressure,
)
[docs]
def read_ram_tlgrid_bytes(data: bytes) -> RamTlGridResult:
"""Alias of :func:`read_ram_tlgrid` (symmetry with other ``*_bytes`` APIs)."""
return read_ram_tlgrid(data)