Source code for sage_analysis.sage_binary

#!/usr/bin/env python
"""
This module defines the ``SageBinaryData`` class. This class interfaces with the
:py:class:`~sage_analysis.model.Model` class to read in binary data written by **SAGE**.
The value of :py:attr:`~sage_analysis.model.Model.sage_output_format` is generally
``sage_binary`` if it is to be read with this class.

If you wish to ingest data from your own flavour of SAGE, please open a Github issue, I plan to add this documentation
in future :)

Author: Jacob Seiler.
"""
import logging
import os
from typing import Any, Dict, Optional

import numpy as np
from tqdm import tqdm

from sage_analysis.data_class import DataClass
from sage_analysis.model import Model
from sage_analysis.utils import read_generic_sage_params

logger = logging.getLogger(__name__)


[docs]class SageBinaryData(DataClass): """ Class intended to inteface with the :py:class:`~sage_analysis.model.Model` class to ingest the data written by **SAGE**. It includes methods for reading the output galaxies, setting cosmology etc. It is specifically written for when :py:attr:`~sage_analysis.model.Model.sage_output_format` is ``sage_binary``. """
[docs] def __init__(self, model: Model, sage_file_to_read: str) -> None: """ Instantiates the Data Class for reading in **SAGE** binary data. In particular, generates the ``numpy`` structured array to read the output galaxies. model: :py:class:`~sage_analysis.model.Model` instance The model that this data class is associated with; this class will read the data for this model. """ logger.info("Reading using SAGE binary output format.") self._get_galaxy_struct() # Use the SAGE parameter file to generate a bunch of attributes. sage_dict = self.read_sage_params(sage_file_to_read) self.sage_model_dict = sage_dict logger.info(f"The read SAGE parameters are {sage_dict}")
[docs] def determine_volume_analyzed(self, model: Model) -> float: """ Determines the volume analyzed. This can be smaller than the total simulation box. Parameters ---------- model : :py:class:`~sage_analysis.model.Model` instance The model that this data class is associated with. Returns ------- volume : float The numeric volume being processed during this run of the code in (Mpc/h)^3. """ # To properly scale properties that use the simulation volume (e.g., SMF), we need # to know how much of the volume this model is analysing. SAGE is formulated such # that every processor writes out a single file. However, each model here can # analyze fewer files than were simulated by SAGE. # For example, SAGE could have run on 4 processors, and hence 4 files would be # produced. To allow the quick inspection of results, we may only be running our # analysis on one file. Hence, we should scale some of the properties by a factor # of 4. # Importantly: There is no way for the binary output of SAGE to know this factor! # Hence, if the user is running in binary mode, they MUST specify the total number # of files that SAGE output (i.e., the number of processors they ran SAGE with). frac_volume_analyzed = (model._last_file_to_analyze - model._first_file_to_analyze + 1) / \ model._num_sage_output_files volume = pow(model._box_size, 3) * frac_volume_analyzed logger.info( f"The files read is [{model._first_file_to_analyze}, {model._last_file_to_analyze}] with a total number " f"of {model._num_sage_output_files}; resulting a volume fraction analyzed of {frac_volume_analyzed}.\nThe " f"box size is {model._box_size} (Mpc/h) yielding a analyzed volume of {volume} (Mpc/h)^3." ) return volume
[docs] def read_sage_params(self, sage_file_path: str) -> Dict[str, Any]: """ Read the **SAGE** parameter file. Parameters ---------- sage_file_path: string Path to the **SAGE** parameter file. Returns ------- model_dict: dict [str, var] Dictionary containing the parameter names and their values. """ model_dict = read_generic_sage_params(sage_file_path) return model_dict
def _get_galaxy_struct(self): """ Sets the ``numpy`` structured array for holding the galaxy data. """ galdesc_full = [ ("SnapNum", np.int32), ("Type", np.int32), ("GalaxyIndex", np.int64), ("CentralGalaxyIndex", np.int64), ("SAGEHaloIndex", np.int32), ("SAGETreeIndex", np.int32), ("SimulationHaloIndex", np.int64), ("mergeType", np.int32), ("mergeIntoID", np.int32), ("mergeIntoSnapNum", np.int32), ("dT", np.float32), ("Pos", (np.float32, 3)), ("Vel", (np.float32, 3)), ("Spin", (np.float32, 3)), ("Len", np.int32), ("Mvir", np.float32), ("CentralMvir", np.float32), ("Rvir", np.float32), ("Vvir", np.float32), ("Vmax", np.float32), ("VelDisp", np.float32), ("ColdGas", np.float32), ("StellarMass", np.float32), ("BulgeMass", np.float32), ("HotGas", np.float32), ("EjectedMass", np.float32), ("BlackHoleMass", np.float32), ("IntraClusterStars", np.float32), ("MetalsColdGas", np.float32), ("MetalsStellarMass", np.float32), ("MetalsBulgeMass", np.float32), ("MetalsHotGas", np.float32), ("MetalsEjectedMass", np.float32), ("MetalsIntraClusterStars", np.float32), ("SfrDisk", np.float32), ("SfrBulge", np.float32), ("SfrDiskZ", np.float32), ("SfrBulgeZ", np.float32), ("DiskRadius", np.float32), ("Cooling", np.float32), ("Heating", np.float32), ("QuasarModeBHaccretionMass", np.float32), ("TimeOfLastMajorMerger", np.float32), ("TimeOfLastMinorMerger", np.float32), ("OutflowRate", np.float32), ("infallMvir", np.float32), ("infallVvir", np.float32), ("infallVmax", np.float32) ] names = [galdesc_full[i][0] for i in range(len(galdesc_full))] formats = [galdesc_full[i][1] for i in range(len(galdesc_full))] galdesc = np.dtype({"names": names, "formats": formats}, align=True) self.galaxy_struct = galdesc
[docs] def determine_num_gals(self, model: Model, *args): """ Determines the number of galaxies in all files for this :py:class:`~sage_analysis.model.Model`. Parameters ---------- model: :py:class:`~sage_analysis.model.Model` class The :py:class:`~sage_analysis.model.Model` we're reading data for. *args : Any Extra arguments to allow other data class to pass extra arguments to their version of ``determine_num_gals``. """ num_gals = 0 for file_num in range(model._first_file_to_analyze, model._last_file_to_analyze+1): fname = self._check_for_file(model, file_num) if fname is None: logger.debug(f"File\t{fname} \tdoes not exist!") continue with open(fname, "rb") as f: _ = np.fromfile(f, np.dtype(np.int32), 1)[0] num_gals_file = np.fromfile(f, np.dtype(np.int32), 1)[0] num_gals += num_gals_file model._num_gals_all_files = num_gals
[docs] def read_gals( self, model: Model, file_num: int, snapshot: int, pbar: Optional[tqdm] = None, plot_galaxies: bool = False, debug: bool = False ): """ Reads the galaxies of a model file at snapshot specified by :py:attr:`~sage_analysis.model.Model.snapshot`. Parameters ---------- model: :py:class:`~sage_analysis.model.Model` class The :py:class:`~sage_analysis.model.Model` we're reading data for. file_num: int Suffix number of the file we're reading. pbar: ``tqdm`` class instance, optional Bar showing the progress of galaxy reading. If ``None``, progress bar will not show. plot_galaxies: bool, optional If set, plots and saves the 3D distribution of galaxies for this file. debug: bool, optional If set, prints out extra useful debug information. Returns ------- gals : ``numpy`` structured array with format given by :py:method:`~_get_galaxy_struct` The galaxies for this file. Notes ----- ``tqdm`` does not play nicely with printing to stdout. Hence we disable the ``tqdm`` progress bar if ``debug=True``. """ fname = self._check_for_file(model, file_num) if fname is None: logger.debug(f"File\t{fname} \tdoes not exist!") return None with open(fname, "rb") as f: # First read the header information. Ntrees = np.fromfile(f, np.dtype(np.int32), 1)[0] num_gals = np.fromfile(f, np.dtype(np.int32), 1)[0] _ = np.fromfile(f, np.dtype((np.int32, Ntrees)), 1) # If there aren't any galaxies, exit here. if num_gals == 0: return None # Then the actual galaxies. gals = np.fromfile(f, self.galaxy_struct, num_gals) # If we're using the `tqdm` package, update the progress bar. if pbar is not None: pbar.set_postfix(file=fname, refresh=False) pbar.update(num_gals) if debug: print("") print(f"File {fname} contained {Ntrees} trees with {num_gals} galaxies") w = np.where(gals["StellarMass"] > 1.0)[0] print(f"{len(w)} of these galaxies have mass greater than 10^10Msun/h") if plot_galaxies: from sage_analysis.plots import plot_spatial_3d # Show the distribution of galaxies in 3D. pos = gals["Pos"][:] output_file = f"./galaxies_{file_num}.{model.plot_output_format}" plot_spatial_3d(pos, output_file, self.box_size) # For the HDF5 file, some data sets have dimensions Nx1 rather than Nx3 # (e.g., Position). To ensure the galaxy data format is identical to the binary # output, we will split the binary fields into Nx1. This is simpler than creating # a new dataset within the HDF5 regime. from numpy.lib import recfunctions as rfn multidim_fields = ["Pos", "Vel", "Spin"] dim_names = ["x", "y", "z"] for field in multidim_fields: for dim_num, dim_name in enumerate(dim_names): dim_field = f"{field}{dim_name}" gals = rfn.rec_append_fields(gals, dim_field, gals[field][:, dim_num]) return gals
[docs] def update_snapshot_and_data_path(self, model: Model, snapshot: int, use_absolute_path: bool = False): """ Updates the :py:attr:`~sage_analysis.model.Model._sage_data_path` to point to a new redshift file. Uses the redshift array :py:attr:`~sage_analysis.model.Model.redshifts`. Parameters ---------- snapshot : int Snapshot we're updating :py:attr:`~sage_analysis.model.Model._sage_data_path` to point to. use_absolute_path : bool If specified, will use the absolute path to the **SAGE** output data. Otherwise, will use the path that is relative to the **SAGE** parameter file. This is hand because the **SAGE** parameter file can contain either relative or absolute paths. """ model._snapshot = snapshot new_redshift = model.redshifts[snapshot] # The parameter file could refer to the absolute path or the relative path, so be careful. if use_absolute_path: base_path = model._base_sage_output_path_absolute else: base_path = model._base_sage_output_path_relative model._sage_data_path = f"{base_path}_z{new_redshift:.3f}"
def _check_for_file(self, model: Model, file_num: int) -> Optional[str]: """ Checks to see if a file for the given file number exists. Importantly, we check assuming that the path given in the **SAGE** parameter file is **relative** and **absolute**. Parameters ---------- file_num : int The file number that we're checking for files. Returns ------- fname or ``None`` If a file exists, the name of that file. Otherwise, if the file does not exist (using either relative or absolute paths), then ``None``. """ # Try relative, and then absolute paths. for use_absolute_path in [False, True]: self.update_snapshot_and_data_path(model, model._snapshot, use_absolute_path=use_absolute_path) fname = f"{model._sage_data_path}_{file_num}" if os.path.isfile(fname): return fname return None
[docs] def close_file(self, model: Model): """ An empty method to ensure consistency with the HDF5 data class. This is empty because snapshots are saved over different files by default in the binary format. """ pass