Source code for at_py.readwrite.fortran_records

from __future__ import annotations

import struct
from dataclasses import dataclass


[docs] @dataclass class FortranRecordReader: """Reader for Fortran unformatted sequential records over a bytes buffer.""" data: memoryview marker_len: int offset: int = 0 def _read_marker(self) -> tuple[int, bytes]: """Read marker words; return ``(record_byte_length, raw_marker_bytes)``.""" n = 4 * self.marker_len if self.offset + n > len(self.data): raise ValueError("unexpected EOF while reading record marker") b = self.data[self.offset : self.offset + n].tobytes() self.offset += n (length,) = struct.unpack_from("<I", b, 0) return int(length), b
[docs] def read_record(self) -> bytes: """Read one unformatted record payload (markers validated).""" start_len, start_raw = self._read_marker() if self.offset + start_len > len(self.data): raise ValueError("unexpected EOF while reading record payload") payload = self.data[self.offset : self.offset + start_len].tobytes() self.offset += start_len end_len, end_raw = self._read_marker() if end_len != start_len: raise ValueError(f"record marker mismatch: {start_len} != {end_len}") if self.marker_len == 2: # Accept common variants for the extra word: duplicate length or zero. extra_start = struct.unpack_from("<I", start_raw, 4)[0] extra_end = struct.unpack_from("<I", end_raw, 4)[0] ok_start = extra_start in (0, start_len) ok_end = extra_end in (0, end_len) if not (ok_start and ok_end): raise ValueError("unsupported marker_len=2 record-marker variant") return payload
[docs] def autodetect_marker_len(data: bytes, *, allowed: tuple[int, ...] = (1, 2)) -> int: """Detect record-marker word count by reading the first flag record.""" mv = memoryview(data) for ml in allowed: rr = FortranRecordReader(mv, marker_len=ml, offset=0) try: payload = rr.read_record() except Exception: continue if payload in (b"'2D'", b"'3D'"): return ml raise ValueError("could not detect Fortran record marker length")