"""This file contains an implementation of end-to-end pipeline execution."""
import argparse
import logging
import os
from os import path
import random
import sys
from time import time
from memory_profiler import memory_usage
from memory_profiler import profile
import numpy as np
import torch
from scalr.data_ingestion_pipeline import DataIngestionPipeline
from scalr.eval_and_analysis_pipeline import EvalAndAnalysisPipeline
from scalr.feature_extraction_pipeline import FeatureExtractionPipeline
from scalr.model_training_pipeline import ModelTrainingPipeline
from scalr.utils import EventLogger
from scalr.utils import FlowLogger
from scalr.utils import read_data
from scalr.utils import set_seed
from scalr.utils import write_data
[docs]
def get_args():
"""A function to get the command line arguments."""
parser = argparse.ArgumentParser()
parser.add_argument('-c',
'--config',
type=str,
help='config.yaml file path',
required=True)
parser.add_argument('-l',
'--log',
action='store_true',
help='flag to store logs for the experiment')
parser.add_argument('--level',
type=str,
default='INFO',
help='set the level of logging')
parser.add_argument('--logpath',
type=str,
default=False,
help='path to store the logs')
parser.add_argument('-m',
'--memoryprofiler',
action='store_true',
help='flag to get memory usage analysis')
args = parser.parse_args()
return args
# Uncomment `@profile` to get line-by-line memory analysis
# @profile
[docs]
def pipeline(config, dirpath, device, flow_logger, event_logger):
"""A function that configures all components of the pipeline for end-to-end execution.
Args:
config: User config.
dirpath: Path of root directory.
flow_logger: Object for flow logger.
event_logger: Object for event logger.
"""
if config.get('data'):
# Data ingestion.
flow_logger.info('Data Ingestion pipeline running')
event_logger.heading1('Data Ingestion')
data_dirpath = path.join(dirpath, 'data')
os.makedirs(data_dirpath, exist_ok=True)
ingest_data = DataIngestionPipeline(config['data'], data_dirpath)
ingest_data.generate_train_val_test_split()
ingest_data.preprocess_data()
if not config['data'].get('label_mappings'):
ingest_data.generate_mappings()
config['data'] = ingest_data.get_updated_config()
write_data(config, path.join(dirpath, 'config.yaml'))
del ingest_data
if config.get('feature_selection'):
# Feature selection.
flow_logger.info('Feature Extraction pipeline running')
event_logger.heading1('Feature Selection')
feature_extraction_dirpath = path.join(dirpath, 'feature_extraction')
os.makedirs(feature_extraction_dirpath, exist_ok=True)
extract_features = FeatureExtractionPipeline(
config['feature_selection'], feature_extraction_dirpath, device)
extract_features.load_data_and_targets_from_config(config['data'])
if not config['feature_selection'].get('score_matrix'):
extract_features.feature_subsetted_model_training()
extract_features.feature_scoring()
else:
extract_features.set_score_matrix(
read_data(config['feature_selection'].get('score_matrix')))
extract_features.top_feature_extraction()
config['data'] = extract_features.write_top_features_subset_data(
config['data'])
config['feature_selection'] = extract_features.get_updated_config()
write_data(config, path.join(dirpath, 'config.yaml'))
del extract_features
if config.get('final_training'):
# Final model training.
flow_logger.info('Final Model Training pipeline running')
event_logger.heading1('Final Model Training')
model_training_dirpath = path.join(dirpath, 'model')
os.makedirs(model_training_dirpath, exist_ok=True)
model_trainer = ModelTrainingPipeline(
config['final_training']['model'],
config['final_training']['model_train_config'],
model_training_dirpath, device)
model_trainer.load_data_and_targets_from_config(config['data'])
model_trainer.build_model_training_artifacts()
model_trainer.train()
model_config, model_train_config = model_trainer.get_updated_config()
config['final_training']['model'] = model_config
config['final_training']['model_train_config'] = model_train_config
write_data(config, path.join(dirpath, 'config.yaml'))
del model_trainer
if config.get('analysis'):
# Analysis of trained model.
flow_logger.info('Analysis pipeline running')
event_logger.heading1('Analysis')
analysis_dirpath = path.join(dirpath, 'analysis')
os.makedirs(analysis_dirpath, exist_ok=True)
if config.get('final_training'):
config['analysis']['model_checkpoint'] = path.join(
model_training_dirpath, 'best_model')
analyser = EvalAndAnalysisPipeline(config['analysis'], analysis_dirpath,
device)
analyser.load_data_and_targets_from_config(config['data'])
if config['analysis'].get('model_checkpoint'):
analyser.evaluation_and_classification_report()
if config['analysis'].get('gene_analysis'):
analyser.gene_analysis()
analyser.perform_downstream_anlaysis()
config['analysis'] = analyser.get_updated_config()
write_data(config, path.join(dirpath, 'config.yaml'))
del analyser
return config
if __name__ == '__main__':
set_seed(42)
args = get_args()
start_time = time()
# Parsing config.
config = read_data(args.config)
# Setting experiment information form config.
dirpath = config['experiment']['dirpath']
exp_name = config['experiment']['exp_name']
exp_run = config['experiment']['exp_run']
dirpath = os.path.join(dirpath, f'{exp_name}_{exp_run}')
device = config['device']
# Logging.
log = args.log
if log:
level = getattr(logging, args.level)
logpath = args.logpath if args.logpath else path.join(
dirpath, 'logs.txt')
else:
level = logging.CRITICAL
logpath = None
flow_logger = FlowLogger('ROOT', level)
flow_logger.info(f'Experiment directory: `{dirpath}`')
if os.path.exists(dirpath):
flow_logger.warning('Experiment directory already exists!')
os.makedirs(dirpath, exist_ok=True)
event_logger = EventLogger('ROOT', level, logpath)
kwargs = dict(config=config,
dirpath=dirpath,
device=device,
flow_logger=flow_logger,
event_logger=event_logger)
if args.memoryprofiler:
max_memory = memory_usage((pipeline, [], kwargs),
max_usage=True,
interval=0.5,
max_iterations=1)
else:
pipeline(**kwargs)
end_time = time()
flow_logger.info(f'Total time taken: {end_time - start_time}')
event_logger.heading1('Runtime Analyis')
event_logger.info(f'Total time taken: {end_time - start_time}')
if args.memoryprofiler:
flow_logger.info(f'Maximum memory usage: {max_memory}')
event_logger.info(f'Maximum memory usage: {max_memory}')