Source code for scalr.utils.file_utils

"""This file contains functions related to file read-write."""

import json
from math import ceil
import os
from os import path
from typing import Union

from anndata import AnnData
from anndata.experimental import AnnCollection
from anndata.io import read_h5ad
from joblib import delayed
from joblib import Parallel
import numpy as np
import pandas as pd
import yaml

from scalr.utils.logger import FlowLogger


[docs] def read_data( filepath: str, backed: str = 'r', index_col: int = 0, return_anncollection: bool = True ) -> Union[dict, AnnData, AnnCollection]: """This function reads a json, yaml, csv or AnnData object file if the file path contains it. Returns an AnnCollection in case of a directory with chunked anndatas. Args: filepath (str): path to `json`, `yaml` or `h5ad` file. Or directory containing multiple `h5ad` files. backed (str, optional): To load AnnData / AnnCollection in backed mode. Defaults to 'r'. Raises: ValueError: In case of the wrong file path provided. Returns: Union[dict, AnnData, AnnCollection]. """ if filepath.endswith('.json'): data = read_json(filepath) elif filepath.endswith('.yaml'): data = read_yaml(filepath) elif filepath.endswith('.csv'): data = read_csv(filepath, index_col=index_col) elif filepath.endswith('.h5ad'): data = read_anndata(filepath, backed=backed) elif path.exists(path.join(filepath, '0.h5ad')): data = read_chunked_anndatas(filepath, backed=backed, return_anncollection=return_anncollection) else: raise ValueError( '''`filepath` is not a `json`, `yaml`, `csv` or `h5ad` file, or directory containing `h5ad` files. ''') return data
[docs] def write_data(data: Union[dict, AnnData, pd.DataFrame], filepath: str): """This function writes data to `json`, `yaml`, `csv` or `h5ad` file.""" if filepath.endswith('.json'): dump_json(data, filepath) elif filepath.endswith('.yaml'): dump_yaml(data, filepath) elif filepath.endswith('.csv'): dump_csv(data, filepath) elif filepath.endswith('.h5ad'): assert type( data ) == AnnData, 'Only AnnData objects can be written as `h5ad` files' dump_anndata(data, filepath) else: raise ValueError( '`filepath` does not contain `json`, `yaml`, or `h5ad` file')
[docs] def write_chunkwise_data(full_data: Union[AnnData, AnnCollection], sample_chunksize: int, dirpath: str, sample_inds: list[int] = None, feature_inds: list[int] = None, transform: callable = None, num_workers: int = 1): """This function writes data subsets iteratively in a chunkwise manner, to ensure only at most `sample_chunksize` samples are loaded at a time. This function can also apply transformation on each chunk. Args: full_data (Union[AnnData, AnnCollection]): data to be written in chunks. sample_chunksize (int): number of samples to be loaded at a time. dirpath (str): path/to/directory to write the chunks of data. sample_inds (list[int], optional): To be used in case of chunking only a subset of samples. Defaults to all samples. feature_inds (list[int], optional): To be used in case of writing only a subset of features.dataframe. Defaults to all features. transform (function): a function to apply a transformation on a chunked numpy array. num_workers (int): Number of jobs to run in parallel for data writing. Additional workers will not use additional memory, but will be CPU-intensive. """ if not path.exists(dirpath): os.makedirs(dirpath) if not num_workers: num_workers = 1 if not sample_inds: sample_inds = list(range(len(full_data))) # Hacky fixes for an AnnCollection working/bug. if sample_chunksize >= len(sample_inds): sample_chunksize = len(sample_inds) - 1 for i, (start) in enumerate(range(0, len(sample_inds), sample_chunksize)): if feature_inds: data = full_data[sample_inds[start:start + sample_chunksize], feature_inds] else: data = full_data[sample_inds[start:start + sample_chunksize]] if not isinstance(data, AnnData): data = data.to_adata() data = data.to_memory(copy=True) for col in data.obs.columns: data.obs[col] = data.obs[col].astype('category') def transform_and_write_data(data: AnnData, chunk_number: int): """Internal function to transform a chunk of data and write it to disk.""" # Handling of empty data if len(data) == 0: return # Transformation if transform: data = AnnData(data.X, obs=data.obs, var=data.var) if not isinstance(data.X, np.ndarray): data.X = data.X.toarray() data.X = transform(data.X) write_data(data, path.join(dirpath, f'{chunk_number}.h5ad')) worker_chunksize = int( ceil(sample_chunksize / num_workers)) if num_workers else sample_chunksize # Execute parallel jobs for transformation and witing of data. parallel = Parallel(n_jobs=num_workers) parallel( delayed(transform_and_write_data)( data=data[j * worker_chunksize:(j + 1) * worker_chunksize], chunk_number=i * num_workers + j) for j in range(num_workers))
[docs] def _get_datapath_from_config(data_config): """This function returns the datapath to be used to read from config. Args: data_config: Data config. """ flow_logger = FlowLogger('File Utils') train_val_test_paths = data_config.get('train_val_test') datapath = None if not train_val_test_paths: raise ValueError('Split Datapaths not given') if train_val_test_paths.get('feature_subset_datapaths'): datapath = 'feature_subset_datapaths' flow_logger.info('Data Loaded from Feature subset datapaths') elif train_val_test_paths.get('final_datapaths'): datapath = 'final_datapaths' flow_logger.info('Data Loaded from Final datapaths') elif train_val_test_paths.get('split_datapaths'): datapath = 'split_datapaths' flow_logger.info('Data Loaded from Split datapaths') else: raise ValueError('Split Datapaths not given') return datapath
[docs] def load_train_val_data_from_config(data_config): """This function returns train & validation data from the data config. Args: data_config: Data config. """ train_val_test_paths = data_config.get('train_val_test') datapath = _get_datapath_from_config(data_config) train_data = read_data(path.join(train_val_test_paths[datapath], 'train')) val_data = read_data(path.join(train_val_test_paths[datapath], 'val')) return train_data, val_data
[docs] def load_test_data_from_config(data_config): """This function returns test data from the data config. Args: data_config: Data config. """ train_val_test_paths = data_config.get('train_val_test') datapath = _get_datapath_from_config(data_config) test_data = read_data(path.join(train_val_test_paths[datapath], 'test')) return test_data
[docs] def load_full_data_from_config(data_config): """This function returns full data from the data config. Args: data_config: Data config. """ train_val_test_paths = data_config.get('train_val_test') datapath = _get_datapath_from_config(data_config) full_datas = [] full_datas += read_data(path.join(train_val_test_paths[datapath], 'train'), return_anncollection=False) full_datas += read_data(path.join(train_val_test_paths[datapath], 'val'), return_anncollection=False) full_datas += read_data(path.join(train_val_test_paths[datapath], 'test'), return_anncollection=False) full_data = AnnCollection(full_datas) return full_data
# Readers
[docs] def read_yaml(filepath: str) -> dict: """This function returns the config file loaded from yaml.""" with open(filepath, 'r') as fh: config = yaml.safe_load(fh) return config
[docs] def read_json(filepath: str) -> dict: """This file returns the json file object.""" with open(filepath, 'r') as fh: config = json.load(fh) return config
[docs] def read_csv(filepath: str, index_col: int = 0) -> pd.DataFrame: """This file returns the DataFrame file object.""" return pd.read_csv(filepath, index_col=index_col)
[docs] def read_anndata(filepath: str, backed: str = 'r') -> AnnData: """This file returns the Anndata object from filepath.""" data = read_h5ad(filepath, backed=backed) return data
[docs] def read_chunked_anndatas(dirpath: str, backed: str = 'r', return_anncollection: bool = True) -> AnnCollection: """This file returns an AnnCollection object from multiple anndatas in dirpath directory. """ datas = [] for i in range(len(os.listdir(dirpath))): if os.path.exists(path.join(dirpath, f'{i}.h5ad')): datas.append( read_anndata(path.join(dirpath, f'{i}.h5ad'), backed=backed)) else: break data = AnnCollection(datas) if return_anncollection: return data else: return datas
# Writers
[docs] def dump_json(config: dict, filepath: str): """This function stores the json file to filepath.""" with open(filepath, 'w') as fh: config = json.dump(config, fh, indent=2) return
[docs] def dump_yaml(config: dict, filepath: str): """This function stores the config file to filepath.""" with open(filepath, 'w') as fh: config = yaml.dump(config, fh) return
[docs] def dump_csv(df: pd.DataFrame, filepath: str): """This function stores the config file to filepath.""" df.to_csv(filepath) return
[docs] def dump_anndata(adata: AnnData, filepath: str): """This function writes the AnnData to filepath.""" adata.write(filepath, compression="gzip")