Source code for pyrvt.tools

"""Tools for reading/writing of files and performing operations."""

from __future__ import annotations

import functools
import multiprocessing
from pathlib import Path

import numpy as np
import numpy.typing as npt
import pyexcel

from pyrvt import motions
from pyrvt.peak_calculators import get_peak_calculator, get_region

PARAMETER_NAMES = [
    ("mag", "Magnitude"),
    ("dist", "Distance (km)"),
    ("vs30", "Vs30 (m/s)"),
    ("kappa", "Site Atten., Kappa0 (sec)"),  # Site Atten., κ₀
    ("duration", "Duration (sec)"),
    ("region", "Region"),
]


[docs] def get_fpaths(pat: str) -> list[Path]: """Iterate over file paths.""" return Path(".").glob(pat) if "*" in pat else [Path(pat)]
[docs] def read_events( fpath: str | Path, response_type: str ) -> tuple[str, np.ndarray, list[dict]]: """Read data from the file an Excel work book. Parameters ---------- fpath : str or `Path` Filename of the input file. response_type : str Type of response. Valid options are: 'psa' for psuedo-spectral acceleration, or 'fa' for Fourier amplitude. Returns ------- ext : str Extension of input file reference : :class:`numpy.ndarray` Reference of the response. This is either period (sec) for response_type 'psa' or frequency (Hz) for response_type 'fa' events : List[dict] List of events read from the file. See ``Note`` in :func:`.calc_compatible_spectra` for more information on structure of the dictionaries. """ assert response_type in ["psa", "fa"] fpath = Path(fpath) data = pyexcel.get_array(file_name=str(fpath)) ext = fpath.suffix parameters = {key: data[i][1:] for i, (key, label) in enumerate(PARAMETER_NAMES)} event_row = len(parameters) + 1 event_count = len(data[0]) - 1 reference = np.array([row[0] for row in data[event_row:]]) events = [] for i in range(event_count): resps = np.array([row[i + 1] for row in data[event_row:]]) # Extract the appropriate attributes e = {k: v[i] for k, v in parameters.items()} e[response_type] = resps if "region" in e: e["region"] = get_region(e["region"]) events.append(e) return ext, reference, events
[docs] def write_events( fname: str | Path, reference: npt.ArrayLike, reference_label: str, response_type: str, response_label: str, events: list[dict], ): """Write the events to a file. Parameters ---------- fname : str Save the events to this file. The directory is created if needed. reference : array_like Periods of the oscillator response shared across all events. reference_label : str Label of the reference (e.g., 'Frequency (Hz)'). response_type : str Type of response. Valid options: `psa` for pseudo-spectral accleration, or `fa` for Fourier amplitude. response_label : str Label of the response type (e.g., 'Fourier Ampl. (g/sec)') events : List[dict] Events to write to file. See ``Note`` in :func:`.calc_compatible_spectra` for more information. Raises ------ NotImplementedError: If extension is not supported """ # Create the rows of output rows = [] # Output the parameters for key, label in PARAMETER_NAMES: rows.append([label] + [e[key] for e in events]) rows.append([reference_label] + len(events) * [response_label]) # Output the response spectra for i in range(len(reference)): rows.append([reference[i]] + [e[response_type][i] for e in events]) # Create the parent directory fpath = Path(fname) fpath.parent.mkdir(parents=True, exist_ok=True) # Write the file pyexcel.save_as(array=rows, dest_file_name=str(fpath))
def _calc_fa(target_freqs, damping, method, event): """Calculate the fourier amplitudes for an event. Note that this is intended as a helper function to be called by multiprocessing.Pool. """ event_keys = ["mag", "dist", "region"] event_kwds = {key: event[key] for key in event_keys} crm = motions.CompatibleRvtMotion( target_freqs, event["psa"], duration=event["duration"], osc_damping=damping, event_kwds=event_kwds, peak_calculator=get_peak_calculator(method, event_kwds), ) psa_calc = crm.calc_osc_accels(target_freqs, damping) return crm, psa_calc
[docs] def calc_compatible_spectra( method: str, periods: npt.ArrayLike, events: list[dict], damping: float = 0.05 ) -> tuple[np.ndarray, list[dict]]: """Compute the response spectrum compatible motions. Parameters ---------- method : str RVT peak factor method, see :func:`~.peak_calculators.get_peak_calculator`. periods : array_like Periods of the oscillator response shared across all events. events : List[dict] All events to consider. See ``Note``. damping : float, optional Fractional damping of the oscillator (decimal). Default value of 0.05 for a damping ratio of 5%. Returns ------- :class:`numpy.ndarray` Frequency of the computed Fourier amplitude spectra. dicts Modified events Note ---- Each event dictionary should have the following keys: - **psa** : :class:`numpy.ndarray` -- pseudo-spectral accelerations. This is the target for the :class:`~.motions.CompatibleRvtMotion`. - **duration** : float, optional -- duration of the ground motion - **magnitude** : float, optional -- earthquake magnitude - **distance** : float, optional -- earthquake distance (km) - **region** : str, optional -- earthquake source region, see :func:`~.peak_calculators.get_region` If no duration is provided one is estimated from the magnitude, distance, and region. The `events` dictionary is modified by this function and adds the following keys: - **duration** : float -- duration of the ground motion if one was not specified - **fa** : :class:`numpy.ndarray` -- Fourier amplitude spectra in units of g*sec - **psa_calc** : :class:`numpy.ndarray` -- Pseudo-spectral acceleration calculated from `fa`. This will differ slightly from `psa_target`. """ target_freqs = 1.0 / periods with multiprocessing.Pool() as pool: results = pool.map( functools.partial(_calc_fa, target_freqs, damping, method), events ) # Copy values back into the dictionary modified = [] for event, (crm, psa_calc) in zip(events, results): event = dict(event) if not event["duration"]: event["duration"] = crm.duration event["fa"] = crm.fourier_amps event["psa_calc"] = psa_calc modified.append(event) # Return the frequency from one of the computed motions. freqs = results[0][0].freqs return freqs, modified
[docs] def operation_psa2fa( src: str | Path, dst: str | Path, damping: float, method: str = "LP99", fixed_spacing: bool = True, verbose: bool = True, ): """Compute the acceleration response spectrum from a Fourier amplitude spectrum. Parameters ---------- src : str Source for the pseudo-spectral accelerations (PSA). This can be a filename or pattern used in :func:`glob.glob`. dst : str Destination directory for the output PSA. The directory is created if it does not exist. damping : float Fractional damping of the oscillator ( decimal). method : str RVT peak factor method, see :func:`~.peak_calculators.get_peak_calculator`. fixed_spacing : bool, optional If `True`, then the periods are interpolated to 301 points equally space in log-space from 0.01 to 10. verbose : bool, optional Print status of calculation. """ dst = Path(dst) dst.mkdir(parents=True, exist_ok=True) for fpath in get_fpaths(src): if verbose: print("Processing:", fpath) ext, periods, events = read_events(fpath, "psa") if fixed_spacing: # Interpolate the periods to a smaller range _periods = np.logspace(-2, 1, 301) for e in events: e["psa"] = np.exp(np.interp(_periods, periods, np.log(e["psa"]))) periods = _periods # Compute the FA from the PSA freqs, events = calc_compatible_spectra( method, periods, events, damping=damping ) basename = fpath.stem.rsplit("_", 1)[0] write_events( dst / (basename + "_sa" + ext), periods, "Period (s)", "psa_calc", "Sa (g)", events, ) write_events( dst / (basename + "_fa" + ext), freqs, "Frequency (Hz)", "fa", "FA (g-s)", events, )
def _calc_psa( osc_freqs: npt.ArrayLike, damping: float, method: str, freqs: npt.ArrayLike, event: dict[str, float], ) -> np.ndarray: """Calculate the response spectra for an event. Note that this is intended as a helper function to be called by multiprocessing.Pool. """ m = motions.RvtMotion( freqs=freqs, fourier_amps=event["fa"], duration=event["duration"], peak_calculator=get_peak_calculator( method, { "region": event["region"], "mag": event["mag"], "dist": event["dist"], }, ), ) return m.calc_osc_accels(osc_freqs, damping)
[docs] def operation_fa2psa( src: str | Path, dst: str | Path, damping: float, method: str = "LP99", fixed_spacing: bool = True, verbose: bool = True, ): """Compute the Fourier amplitude spectrum from a accel. response spectrum. Parameters ---------- src : str Source for the Fourier amplitudes. This can be a filename or pattern used in :func:`glob.glob`. dst : str Destination directory for the output PSA. The directory is created if it does not exist. damping : float Fractional damping of the oscillator (decimal). method : str RVT peak factor method, see :func:`~.peak_calculators.get_peak_calculator`. fixed_spacing : bool, optional If `True`, then the periods are interpolated to 301 points equally space in log-space from 0.01 to 10. """ if fixed_spacing: periods = np.logspace(-2, 1, 301) osc_freqs = 1.0 / periods dst = Path(dst) dst.mkdir(parents=True, exist_ok=True) for fpath in get_fpaths(src): if verbose: print("Processing:", fpath) ext, freqs, events = read_events(fpath, "fa") if not fixed_spacing: osc_freqs = freqs periods = 1.0 / osc_freqs with multiprocessing.Pool() as pool: psas = pool.map( functools.partial(_calc_psa, osc_freqs, damping, method, freqs), events ) for event, psa in zip(events, psas): event["psa"] = psa basename = fpath.stem.rsplit("_", 1)[0] write_events( dst / (basename + "_sa" + ext), periods, "Period (s)", "psa", "Sa (g)", events, )
[docs] def read_at2(fname: str) -> tuple[float, np.ndarray]: """Read an AT2 formatted file.""" with open(fname) as fp: # Skip the first three header lines for _ in range(3): next(fp) parts = next(fp).replace(",", "").split() time_step = float(parts[3].replace("SEC", "")) accels = np.array([float(part) for line in fp for part in line.split()]) return time_step, accels