"""Combine output files from individual simulation runs into a single library hdf5 file.
This file can be executed as a script (see the :func:`main` function), and also provides an API
method (:func:`sam_lib_combine`) for programatically combining libraries.
"""
import argparse
from pathlib import Path
import numpy as np
import h5py
import tqdm
import holodeck as holo
import holodeck.librarian
from holodeck.librarian import libraries
[docs]
def main():
"""Command-line interface executable method for combining holodeck simulation files.
"""
log = holo.log
try:
from mpi4py import MPI
comm = MPI.COMM_WORLD
except ModuleNotFoundError:
comm = None
if (comm is not None) and (comm.rank > 0):
err = f"Cannot run `{__file__}::main()` with multiple processors!"
log.exception(err)
raise RuntimeError(err)
parser = argparse.ArgumentParser()
parser.add_argument(
'path', default=None,
help='library directory to run combination on; must contain the `sims` subdirectory'
)
parser.add_argument(
'--recreate', '-r', action='store_true', default=False,
help='recreate/replace existing combined library file with a new merge.'
)
parser.add_argument(
'--gwb', action='store_true', default=False,
help='only merge the key GWB data (no single source, or binary parameter data).'
)
args = parser.parse_args()
log.debug(f"{args=}")
path = Path(args.path)
sam_lib_combine(path, log, recreate=args.recreate, gwb_only=args.gwb)
return
[docs]
def sam_lib_combine(path_output, log, path_pspace=None, recreate=False, gwb_only=False):
"""Combine individual simulation files into a single library (hdf5) file.
Arguments
---------
path_output : str or Path,
Path to output directory where combined library will be saved.
log : `logging.Logger`
Logging instance.
path_sims : str or None,
Path to output directory containing simulation files.
If `None` this is set to be the same as `path_output`.
path_pspace : str or None,
Path to file containing _Param_Space subclass instance.
If `None` then `path_output` is searched for a `_Param_Space` save file.
Returns
-------
lib_path : Path,
Path to library output filename (typically ending with 'sam_lib.hdf5').
"""
# ---- setup paths
path_output = Path(path_output)
log.info(f"Path output = {path_output}")
path_sims = path_output.joinpath('sims')
# ---- see if a combined library already exists
lib_path = libraries.get_sam_lib_fname(path_output, gwb_only)
if lib_path.exists():
lvl = log.INFO if recreate else log.WARNING
log.log(lvl, f"combined library already exists: {lib_path}, run with `-r` to recreate.")
if not recreate:
return
log.log(lvl, "re-combining data into new file")
# ---- load parameter space from save file
'''
if path_pspace is None:
# look for parameter-space save files
regex = "*" + holo.librarian.PSPACE_FILE_SUFFIX # "*.pspace.npz"
files = sorted(path_output.glob(regex))
num_files = len(files)
msg = f"found {num_files} pspace.npz files in {path_output}"
log.info(msg)
if num_files != 1:
log.exception(msg)
log.exception(f"{files=}")
log.exception(f"{regex=}")
raise RuntimeError(f"{msg}")
path_pspace = files[0]
pspace = _Param_Space.from_save(path_pspace, log)
'''
if path_pspace is None:
path_pspace = path_output
pspace, pspace_fname = libraries.load_pspace_from_path(path_pspace, log=log)
log.info(f"loaded param space: {pspace} from '{pspace_fname}'")
param_names = pspace.param_names
param_samples = pspace.param_samples
nsamp, ndim = param_samples.shape
log.debug(f"{nsamp=}, {ndim=}, {param_names=}")
# ---- make sure all files exist; get shape information from files
log.info(f"checking that all {nsamp} files exist")
fobs_cents, fobs_edges, nreals, nloudest, has_gwb, has_ss, has_params = _check_files_and_load_shapes(
log, path_sims, nsamp
)
nfreqs = fobs_cents.size
log.debug(f"{nfreqs=}, {nreals=}, {nloudest=}")
log.debug(f"{has_gwb=}, {has_ss=}, {has_params=}")
if not has_gwb and gwb_only:
err = f"Combining with {gwb_only=}, but received {has_gwb=} from `_check_files_and_load_shapes`!"
log.exception(err)
raise RuntimeError(err)
if (fobs_cents is None) or (nreals is None):
err = f"After checking files, {fobs_cents=} and {nreals=}!"
log.exception(err)
raise ValueError(err)
# ---- load results from all files
gwb = np.zeros((nsamp, nfreqs, nreals)) if has_gwb else None
if (not gwb_only) and has_ss:
hc_ss = np.zeros((nsamp, nfreqs, nreals, nloudest))
hc_bg = np.zeros((nsamp, nfreqs, nreals))
else:
hc_ss = None
hc_bg = None
if (not gwb_only) and has_params:
sspar = np.zeros((nsamp, 4, nfreqs, nreals, nloudest))
bgpar = np.zeros((nsamp, 7, nfreqs, nreals))
else:
sspar = None
bgpar = None
gwb, hc_ss, hc_bg, sspar, bgpar, bad_files = _load_library_from_all_files(
path_sims, gwb, hc_ss, hc_bg, sspar, bgpar, log,
)
if has_gwb:
log.info(f"Loaded data from all library files | {holo.utils.stats(gwb)=}")
param_samples[bad_files] = np.nan
# ---- Save to concatenated output file ----
log.info(f"Writing collected data to file {lib_path}")
with h5py.File(lib_path, 'w') as h5:
h5.create_dataset('fobs_cents', data=fobs_cents)
h5.create_dataset('fobs_edges', data=fobs_edges)
h5.create_dataset('sample_params', data=param_samples)
if gwb is not None:
h5.create_dataset('gwb', data=gwb)
if not gwb_only:
if has_ss:
h5.create_dataset('hc_ss', data=hc_ss)
h5.create_dataset('hc_bg', data=hc_bg)
if has_params:
h5.create_dataset('sspar', data=sspar)
h5.create_dataset('bgpar', data=bgpar)
h5.attrs['param_names'] = np.array(param_names).astype('S')
# new in librarian-v1.1
h5.attrs['parameter_space_class_name'] = pspace.name
h5.attrs['holodeck_version'] = holo.__version__
# I'm not sure if this can/will throw errors, but don't let the combination fail if it does.
try:
git_hash = holo.utils.get_git_hash()
except: # noqa
git_hash = "None"
h5.attrs['holodeck_git_hash'] = git_hash
h5.attrs['holodeck_librarian_version'] = holo.librarian.__version__
log.warning(f"Saved to {lib_path}, size: {holo.utils.get_file_size(lib_path)}")
return lib_path
def _check_files_and_load_shapes(log, path_sims, nsamp):
"""Check that all `nsamp` files exist in the given path, and load info about array shapes.
Arguments
---------
path_sims : str
Path in which individual simulation files can be found.
nsamp : int
Number of simulations/files that should be found.
This should typically be loaded from the parameter-space object used to generate the library.
Returns
-------
fobs : (F,) ndarray
Observer-frame frequency bin centers at which GW signals are calculated.
nreals : int
Number of realizations in the output files.
"""
fobs_edges = None
fobs_cents = None
nreals = None
nloudest = None
has_gwb = False
has_ss = False
has_params = False
log.info(f"Checking {nsamp} files in {path_sims}")
for ii in tqdm.trange(nsamp):
temp_fname = libraries._get_sim_fname(path_sims, ii)
if not temp_fname.exists():
err = f"Missing at least file number {ii} out of {nsamp} files! {temp_fname}"
log.exception(err)
raise ValueError(err)
# if we've already loaded all of the necessary info, then move on to the next file
if (fobs_cents is not None) and (nreals is not None) and (nloudest is not None):
continue
temp = np.load(temp_fname)
data_keys = list(temp.keys())
log.debug(f"{ii=} {temp_fname.name=} {data_keys=}")
if fobs_cents is None:
_fobs = temp.get('fobs', None)
if _fobs is not None:
err = "Found `fobs` in data, expected only `fobs_cents` and `fobs_edges`!"
log.exception(err)
raise ValueError(err)
fobs_cents = temp['fobs_cents']
fobs_edges = temp['fobs_edges']
if (not has_gwb) and ('gwb' in data_keys):
has_gwb = True
if (not has_ss) and ('hc_ss' in data_keys):
assert 'hc_bg' in data_keys
has_ss = True
if (not has_params) and ('sspar' in data_keys):
assert 'bgpar' in data_keys
has_params = True
# find a file that has GWB data in it (not all of them do, if the file was a 'failure' file)
if (nreals is None):
nreals_1 = None
nreals_2 = None
if ('gwb' in data_keys):
nreals_1 = temp['gwb'].shape[-1]
nreals = nreals_1
if ('hc_bg' in data_keys):
nreals_2 = temp['hc_bg'].shape[-1]
nreals = nreals_2
if (nreals_1 is not None) and (nreals_2 is not None):
assert nreals_1 == nreals_2
if (nloudest is None) and ('hc_ss' in data_keys):
nloudest = temp['hc_ss'].shape[-1]
return fobs_cents, fobs_edges, nreals, nloudest, has_gwb, has_ss, has_params
def _load_library_from_all_files(path_sims, gwb, hc_ss, hc_bg, sspar, bgpar, log):
"""Load data from all individual simulation files.
Arguments
---------
path_sims : str
Path to find individual simulation files.
gwb : (S, F, R) ndarray
Array in which to store GWB data from all of 'S' files.
S: num-samples/simulations, F: num-frequencies, R: num-realizations.
log : `logging.Logger`
Logging instance.
"""
if hc_bg is not None:
nsamp = hc_bg.shape[0]
elif gwb is not None:
nsamp = gwb.shape[0]
else:
err = "Unable to get shape from either `hc_bg` or `gwb`!"
log.exception(err)
raise RuntimeError(err)
log.info(f"Collecting data from {nsamp} files")
bad_files = np.zeros(nsamp, dtype=bool) #: track which files contain UN-useable data
msg = None
for pnum in tqdm.trange(nsamp):
fname = libraries._get_sim_fname(path_sims, pnum)
temp = np.load(fname, allow_pickle=True)
# When a processor fails for a given parameter, the output file is still created with the 'fail' key added
if ('fail' in temp):
msg = f"file {pnum=:06d} is a failure file, setting values to NaN ({fname})"
log.warning(msg)
# set all parameters to NaN for failure files. Note that this is distinct from gwb=0.0 which can be real.
if gwb is not None:
gwb[pnum, :, :] = np.nan
# `hc_ss` will be set to None if `gwb_only==True`
if hc_ss is not None:
hc_ss[pnum, :, :, :] = np.nan
hc_bg[pnum, :, :] = np.nan
bad_files[pnum] = True
continue
# store the GWB from this file
if gwb is not None:
gwb[pnum, :, :] = temp['gwb'][...]
# `hc_ss` will be set to None if `gwb_only==True`
if (hc_ss is not None):
hc_ss[pnum, :, :, :] = temp['hc_ss'][...]
hc_bg[pnum, :, :] = temp['hc_bg'][...]
# `hc_ss` will be set to None if `gwb_only==True`
if bgpar is not None:
sspar[pnum, :, :, :, :] = temp['sspar'][...]
bgpar[pnum, :, :, :] = temp['bgpar'][...]
log.info(f"{holo.utils.frac_str(bad_files)} files are failures")
return gwb, hc_ss, hc_bg, sspar, bgpar, bad_files
if __name__ == "__main__":
main()