"""This file contains functions related to file read-write."""
import json
from math import ceil
import os
from os import path
from typing import Union
from anndata import AnnData
from anndata.experimental import AnnCollection
from anndata.io import read_h5ad
from joblib import delayed
from joblib import Parallel
import numpy as np
import pandas as pd
import yaml
from scalr.utils.logger import FlowLogger
[docs]
def read_data(
filepath: str,
backed: str = 'r',
index_col: int = 0,
return_anncollection: bool = True
) -> Union[dict, AnnData, AnnCollection]:
"""This function reads a json, yaml, csv or AnnData object file if the file path contains it.
Returns an AnnCollection in case of a directory with chunked anndatas.
Args:
filepath (str): path to `json`, `yaml` or `h5ad` file.
Or directory containing multiple `h5ad` files.
backed (str, optional): To load AnnData / AnnCollection in backed mode. Defaults to 'r'.
Raises:
ValueError: In case of the wrong file path provided.
Returns:
Union[dict, AnnData, AnnCollection].
"""
if filepath.endswith('.json'):
data = read_json(filepath)
elif filepath.endswith('.yaml'):
data = read_yaml(filepath)
elif filepath.endswith('.csv'):
data = read_csv(filepath, index_col=index_col)
elif filepath.endswith('.h5ad'):
data = read_anndata(filepath, backed=backed)
elif path.exists(path.join(filepath, '0.h5ad')):
data = read_chunked_anndatas(filepath,
backed=backed,
return_anncollection=return_anncollection)
else:
raise ValueError(
'''`filepath` is not a `json`, `yaml`, `csv` or `h5ad` file,
or directory containing `h5ad` files.
''')
return data
[docs]
def write_data(data: Union[dict, AnnData, pd.DataFrame], filepath: str):
"""This function writes data to `json`, `yaml`, `csv` or `h5ad` file."""
if filepath.endswith('.json'):
dump_json(data, filepath)
elif filepath.endswith('.yaml'):
dump_yaml(data, filepath)
elif filepath.endswith('.csv'):
dump_csv(data, filepath)
elif filepath.endswith('.h5ad'):
assert type(
data
) == AnnData, 'Only AnnData objects can be written as `h5ad` files'
dump_anndata(data, filepath)
else:
raise ValueError(
'`filepath` does not contain `json`, `yaml`, or `h5ad` file')
[docs]
def write_chunkwise_data(full_data: Union[AnnData, AnnCollection],
sample_chunksize: int,
dirpath: str,
sample_inds: list[int] = None,
feature_inds: list[int] = None,
transform: callable = None,
num_workers: int = 1):
"""This function writes data subsets iteratively in a chunkwise manner, to ensure
only at most `sample_chunksize` samples are loaded at a time.
This function can also apply transformation on each chunk.
Args:
full_data (Union[AnnData, AnnCollection]): data to be written in chunks.
sample_chunksize (int): number of samples to be loaded at a time.
dirpath (str): path/to/directory to write the chunks of data.
sample_inds (list[int], optional): To be used in case of chunking
only a subset of samples.
Defaults to all samples.
feature_inds (list[int], optional): To be used in case of writing
only a subset of features.dataframe.
Defaults to all features.
transform (function): a function to apply a transformation on a chunked numpy array.
num_workers (int): Number of jobs to run in parallel for data writing. Additional
workers will not use additional memory, but will be CPU-intensive.
"""
if not path.exists(dirpath):
os.makedirs(dirpath)
if not num_workers:
num_workers = 1
if not sample_inds:
sample_inds = list(range(len(full_data)))
# Hacky fixes for an AnnCollection working/bug.
if sample_chunksize >= len(sample_inds):
sample_chunksize = len(sample_inds) - 1
for i, (start) in enumerate(range(0, len(sample_inds), sample_chunksize)):
if feature_inds:
data = full_data[sample_inds[start:start + sample_chunksize],
feature_inds]
else:
data = full_data[sample_inds[start:start + sample_chunksize]]
if not isinstance(data, AnnData):
data = data.to_adata()
data = data.to_memory(copy=True)
for col in data.obs.columns:
data.obs[col] = data.obs[col].astype('category')
def transform_and_write_data(data: AnnData, chunk_number: int):
"""Internal function to transform a chunk of data and write
it to disk."""
# Handling of empty data
if len(data) == 0:
return
# Transformation
if transform:
data = AnnData(data.X, obs=data.obs, var=data.var)
if not isinstance(data.X, np.ndarray):
data.X = data.X.toarray()
data.X = transform(data.X)
write_data(data, path.join(dirpath, f'{chunk_number}.h5ad'))
worker_chunksize = int(
ceil(sample_chunksize /
num_workers)) if num_workers else sample_chunksize
# Execute parallel jobs for transformation and witing of data.
parallel = Parallel(n_jobs=num_workers)
parallel(
delayed(transform_and_write_data)(
data=data[j * worker_chunksize:(j + 1) * worker_chunksize],
chunk_number=i * num_workers + j) for j in range(num_workers))
[docs]
def _get_datapath_from_config(data_config):
"""This function returns the datapath to be used to read from config.
Args:
data_config: Data config.
"""
flow_logger = FlowLogger('File Utils')
train_val_test_paths = data_config.get('train_val_test')
datapath = None
if not train_val_test_paths:
raise ValueError('Split Datapaths not given')
if train_val_test_paths.get('feature_subset_datapaths'):
datapath = 'feature_subset_datapaths'
flow_logger.info('Data Loaded from Feature subset datapaths')
elif train_val_test_paths.get('final_datapaths'):
datapath = 'final_datapaths'
flow_logger.info('Data Loaded from Final datapaths')
elif train_val_test_paths.get('split_datapaths'):
datapath = 'split_datapaths'
flow_logger.info('Data Loaded from Split datapaths')
else:
raise ValueError('Split Datapaths not given')
return datapath
[docs]
def load_train_val_data_from_config(data_config):
"""This function returns train & validation data from the data config.
Args:
data_config: Data config.
"""
train_val_test_paths = data_config.get('train_val_test')
datapath = _get_datapath_from_config(data_config)
train_data = read_data(path.join(train_val_test_paths[datapath], 'train'))
val_data = read_data(path.join(train_val_test_paths[datapath], 'val'))
return train_data, val_data
[docs]
def load_test_data_from_config(data_config):
"""This function returns test data from the data config.
Args:
data_config: Data config.
"""
train_val_test_paths = data_config.get('train_val_test')
datapath = _get_datapath_from_config(data_config)
test_data = read_data(path.join(train_val_test_paths[datapath], 'test'))
return test_data
[docs]
def load_full_data_from_config(data_config):
"""This function returns full data from the data config.
Args:
data_config: Data config.
"""
train_val_test_paths = data_config.get('train_val_test')
datapath = _get_datapath_from_config(data_config)
full_datas = []
full_datas += read_data(path.join(train_val_test_paths[datapath], 'train'),
return_anncollection=False)
full_datas += read_data(path.join(train_val_test_paths[datapath], 'val'),
return_anncollection=False)
full_datas += read_data(path.join(train_val_test_paths[datapath], 'test'),
return_anncollection=False)
full_data = AnnCollection(full_datas)
return full_data
# Readers
[docs]
def read_yaml(filepath: str) -> dict:
"""This function returns the config file loaded from yaml."""
with open(filepath, 'r') as fh:
config = yaml.safe_load(fh)
return config
[docs]
def read_json(filepath: str) -> dict:
"""This file returns the json file object."""
with open(filepath, 'r') as fh:
config = json.load(fh)
return config
[docs]
def read_csv(filepath: str, index_col: int = 0) -> pd.DataFrame:
"""This file returns the DataFrame file object."""
return pd.read_csv(filepath, index_col=index_col)
[docs]
def read_anndata(filepath: str, backed: str = 'r') -> AnnData:
"""This file returns the Anndata object from filepath."""
data = read_h5ad(filepath, backed=backed)
return data
[docs]
def read_chunked_anndatas(dirpath: str,
backed: str = 'r',
return_anncollection: bool = True) -> AnnCollection:
"""This file returns an AnnCollection object from multiple anndatas
in dirpath directory.
"""
datas = []
for i in range(len(os.listdir(dirpath))):
if os.path.exists(path.join(dirpath, f'{i}.h5ad')):
datas.append(
read_anndata(path.join(dirpath, f'{i}.h5ad'), backed=backed))
else:
break
data = AnnCollection(datas)
if return_anncollection:
return data
else:
return datas
# Writers
[docs]
def dump_json(config: dict, filepath: str):
"""This function stores the json file to filepath."""
with open(filepath, 'w') as fh:
config = json.dump(config, fh, indent=2)
return
[docs]
def dump_yaml(config: dict, filepath: str):
"""This function stores the config file to filepath."""
with open(filepath, 'w') as fh:
config = yaml.dump(config, fh)
return
[docs]
def dump_csv(df: pd.DataFrame, filepath: str):
"""This function stores the config file to filepath."""
df.to_csv(filepath)
return
[docs]
def dump_anndata(adata: AnnData, filepath: str):
"""This function writes the AnnData to filepath."""
adata.write(filepath, compression="gzip")