Source code for seisnn.utils

"""
Utilities
"""

import functools
import glob
import multiprocessing as mp
import os

import numpy as np
import tqdm
import yaml


[docs]class Config: __slots__ = [ 'workspace', 'sds_root', 'sfile_root', 'tfrecord', 'train', 'test', 'eval', 'sql_database', 'catalog', 'geom', 'models', ]
[docs] def __init__(self, workspace=os.path.expanduser("~"), initialize=False): if initialize: self.generate_config() self.load_config(workspace) self.create_folders() try: self.load_config(workspace) except FileNotFoundError: print('Missing config.yml, please use Config(initialize=True).')
def load_config(self, workspace=os.path.expanduser("~")): config_file = os.path.abspath(os.path.join(workspace, 'config.yml')) with open(config_file, 'r') as file: config = yaml.full_load(file) self.workspace = config['WORKSPACE'] self.sds_root = config['SDS_ROOT'] self.sfile_root = config['SFILE_ROOT'] self.tfrecord = config['TFRecord'] self.train = config['Train'] self.test = config['Test'] self.eval = config['Eval'] self.sql_database = config['SQL_Database'] self.catalog = config['Catalog'] self.geom = config['Geom'] self.models = config['Models'] @staticmethod def generate_config(workspace=os.path.expanduser('~')): config = { 'WORKSPACE': workspace, 'SDS_ROOT': os.path.join(workspace, 'SDS_ROOT'), 'SFILE_ROOT': os.path.join(workspace, 'SFILE_ROOT'), 'TFRecord': os.path.join(workspace, 'TFRecord'), 'Train': os.path.join(workspace, 'TFRecord', 'Train'), 'Test': os.path.join(workspace, 'TFRecord', 'Test'), 'Eval': os.path.join(workspace, 'TFRecord', 'Eval'), 'SQL_Database': os.path.join(workspace, 'SQL_Database'), 'Catalog': os.path.join(workspace, 'Catalog'), 'Geom': os.path.join(workspace, 'Geom'), 'Models': os.path.join(workspace, 'Models'), } path = os.path.join(workspace, 'config.yml') with open(path, 'w') as file: yaml.dump(config, file, sort_keys=False) print(f'Create config: {path}') def create_folders(self): path_list = [ self.tfrecord, self.train, self.test, self.eval, self.sql_database, self.catalog, self.geom, self.models, ] for d in path_list: make_dirs(d) print(f'Create folder: {d}')
[docs]def make_dirs(path): """ Create dir if path does not exist. :param str path: Directory path. """ if not os.path.isdir(path): os.makedirs(path, mode=0o777, exist_ok=True)
[docs]def batch(iterable, size=1): """ Yields a batch from a list. :param iterable: Data list. :param int size: Batch size. """ iter_len = len(iterable) for ndx in range(0, iter_len, size): yield iterable[ndx:min(ndx + size, iter_len)]
[docs]def batch_operation(data_list, func, **kwargs): """ Unpacks and repacks a batch. :param data_list: List of data. :param func: Targeted function. :param kwargs: Fixed function parameter. :return: List of results. """ return [func(data, **kwargs) for data in data_list]
def _parallel_process(file_list, par, batch_size=None, cpu_count=None): """ Parallelize a partial function and return results in a list. :param list file_list: Process list for partial function. :param par: Partial function. :rtype: list :return: List of results. """ if cpu_count is None: cpu_count = mp.cpu_count() print(f'Found {cpu_count} cpu threads:') pool = mp.Pool(processes=cpu_count, maxtasksperchild=1) if not batch_size: batch_size = int(np.ceil(len(file_list) / cpu_count)) map_func = pool.imap_unordered(par, batch(file_list, batch_size)) result = [output for output in map_func] pool.close() pool.join() return result
[docs]def parallel(data_list, func, batch_size=None, cpu_count=None, **kwargs): """ Parallels a function. :param data_list: List of data. :param func: Paralleled function. :param batch_size: :param cpu_count: :param kwargs: Fixed function parameters. :return: List of results. """ par = functools.partial(batch_operation, func=func, **kwargs) result_list = _parallel_process(data_list, par, batch_size, cpu_count) return result_list
def _parallel_iter(par, iterator): """ Parallelize a partial function and return results in a list. :param par: Partial function. :param iterator: Iterable object. :rtype: list :return: List of results. """ pool = mp.Pool(processes=mp.cpu_count(), maxtasksperchild=1) output = [] for thread_output in tqdm.tqdm(pool.imap_unordered(par, iterator)): if thread_output: output.extend(thread_output) pool.close() pool.join() return output
[docs]def get_dir_list(file_dir, suffix="", recursive=True): """ Returns directory list from the given path. :param str file_dir: Target directory. :param str suffix: (Optional.) File extension, Ex: '.tfrecord'. :param bool recursive: (Optional.) Search directory recursively. Default is True. :rtype: list :return: List of file name. """ file = os.path.join(file_dir, f'**/*{suffix}') file_list = glob.glob(file, recursive=recursive) file_list = sorted(file_list) return file_list
[docs]def flatten_list(nested_list): return [item for sublist in nested_list for item in sublist]
[docs]def unet_padding_size(trace, pool_size=2, layers=4): """ Return left and right padding size for a given trace. :param np.array trace: Trace array. :param int pool_size: (Optional.) Unet pool size, default is 2. :param int layers: (Optional.) Unet stages, default is 4. :return: (left padding size, right padding size) """ length = len(trace) output = length for _ in range(layers): output = int(np.ceil(output / pool_size)) padding = output * (pool_size ** layers) - length lpad = 0 rpad = padding return lpad, rpad
if __name__ == "__main__": pass