In this tutorial we will learn how to create an advanced task to execute a Blast. In this tutorial we will see how we can
- download file from an external source
- create a virtual environment
- prepare the data
- execute blast in the virtual environment
- clean data after the run
In this tutorial we will create a task that does the same as the following tutorial in Constellab environment : https://angus.readthedocs.io/en/2019/running-command-line-blast.html#running-blast
Create the task
First create a Task named Blast and define input, output and config as following :
@task_decorator("Blast", human_name="Blast")
class Blast(Task):
input_specs = {'input_file': InputSpec(File, human_name="Compressed fasta file",
short_description="The fasta.gz file to compare with db"),
}
output_specs = {'blast_result': OutputSpec(File, human_name="Blast result file",
short_description="Result file generated by the blast command")}
config_specs = {
'db_path': StrParam( default_value="https://storage.gra.cloud.ovh.net/v1/AUTH_a0286631d7b24afba3f3cdebed2992aa/opendata/gws_academy/zebrafish.1.protein.faa.gz",
human_name="Database path", short_description="Must be a .faa.gz file"),
'head': IntParam(default_value=0, human_name="Limit input file read",
short_description="Number of line in the input file to compare. 0 for all lines")}
This task takes a compressed fasta file as input returns the blast output as result. It has 2 configurations :
-
db_path
: path of the db to download to compares sequences -
head
: use to limite the input size to faster the blast.
Download the database
To make the Task
flexible we provided a config to download the database form an external source. To download the database, we will use the TaskFileDownloader
. This simplify the download of external source and can detect if the file was already downloaded to prevent re-downloading it. Here is the documentation about this : Use external data
Here is the code to download the database
# Download the database
# retrieve the db url from the param
db_url: str = params['db_path']
# extract the filename from the url
db_file_name = db_url.split('/')[-1]
# create the file downloader using the current task brick name,
# by passing the brick name of the Task, the file will be downloaded in a specific location for the brick
# also pass the message_dispatcher to log downlod progress in the task messag
file_downloader = TaskFileDownloader(Blast.get_brick_name(), self.message_dispatcher)
# download the db and retrieve the path of the downloaded file
zebra_zipped_db = file_downloader.download_file_if_missing(db_url, db_file_name)
After this code, the downloaded db will be available and the path of the file available in zebra_zipped_db
.
Create the virtual environment
To run the blast we will need the blastp
command line . We will install it in a Conda virtual environment so this will not interfere with current environment. Here is the documentation about virtual environment : Virtual environment and command line.
To create the environment we will use the CondaShellProxy
class. We will delegate the env creation to an external class of our Task
named BlastEnvHelper
. Having a external class to manage the virtual environment is a good practice as multiple Task
can use the same environment.
Here is the BlastEnvHelper
class
class BlastEnvHelper():
# define the name of the virtual environment, it must be unique
CONDA_ENV_DIR_NAME = "BlastCondaEnv"
# path of the yaml environment file. The file blast_conda.yml must be in the same folder as this file
CONDA_ENV_FILE_PATH = os.path.join(os.path.abspath(
os.path.dirname(__file__)), "blast_conda.yml")
# method to create the conda shell proxy.
# we pass the MessageDispatcher so that the output of the command line will be logged in the Task
@classmethod
def create_conda_proxy(cls, message_dispatcher: MessageDispatcher = None) -> CondaShellProxy:
return CondaShellProxy(cls.CONDA_ENV_DIR_NAME, cls.CONDA_ENV_FILE_PATH, message_dispatcher=message_dispatcher)
This required a yaml
environment file, named blast_conda.yml
in the same folder as the above file. Here is the content of the conda environment file which defined the required package to execute the blast.
channels:
- bioconda
- conda-forge
dependencies:
- python=3.8
- biopython
- blast==2.13.0
Now we need to create the ShellProxy
in our Task
to be able to run command in the virtual environment :
shell_proxy = BlastEnvHelper.create_conda_proxy( self.message_dispatcher)
Prepare the data
Now we will follow the blast tutorial to prepare the data to run the blast. We will need to :
- Unzip the database and create the blast database from the unzipped file
- Retrieve the input file, unzip it, get the first X lines based on the config and move the result in the working directory
Let's see the code :
############################ Prepare the DB ############################
zebra_db = "zebra_fish_db.faa"
# Unzip db file in the working directory and verify the result
result = shell_proxy.run([f"gunzip -c {zebra_zipped_db} > {zebra_db}"])
if result != 0:
raise Exception('Error during the unzip of database .gz file')
# Create the blast db in the working directory and check the result
# all the shell command are executed in the virtual environment, so the makeblastdb is available
result = shell_proxy.run([f"makeblastdb -in {zebra_db} -dbtype prot -out {zebra_db}"])
if result != 0:
raise Exception('Error during the creation of the blast db')
############################ Prepare the input file ############################
# retrive the input table
file: File = inputs['input_file']
# Unzip the input file in the working directory and verify the result
input_file_unzipped = "input.faa"
result = shell_proxy.run([f"gunzip -c {file.path} > {input_file_unzipped}"])
if result != 0:
raise Exception('Error during the unzip of .gz file')
# Limit the number of lines to compare if needed
head: int = params['head']
file_to_compare: str = None
# limit the number of lines
if head > 0:
# use the head command to limit the number of lines
sub_file = 'sub_input_file.faa'
result = shell_proxy.run([f"head -n {head} {input_file_unzipped} > {sub_file}"])
if result != 0:
raise Exception('Error during the head command')
file_to_compare = sub_file
else:
# no need to limit the number of line
file_to_compare = input_file_unzipped
After this code we will have the Db ready in the working directory, named zebra_db
and our input file ready in the working directory named file_to_compare
.
Execute the blast
Now that everything is ready, this is the ease part, we need to execute the blast and return the result.
############################ Run the blast and retrieve reulst ############################
output_file_name = 'output.txt'
# run the blast
result = shell_proxy.run([f"blastp -query {file_to_compare} -db {zebra_db} -out {output_file_name}"])
if result != 0:
raise Exception('Error during the blast')
# get the absolute path of the output
output_file_path = os.path.join(shell_proxy.working_dir, output_file_name)
# create the output Resource (File)
output_file = File(output_file_path)
# return the output table
return {'blast_result': output_file}
Clean the data after the run
As a lot of files were creating during this process (unzip and database creation), it is a good practice to clear the working directory after the run of the task.
To do this, we will implement the run_after_task
method of the Task. Here is the documentation about it : Run after task
def run_after_task(self) -> None:
# use to delete the temp folder once the task is done and output resources saved
# this is safe to do it here becase the output resource was move to the Resource location
if self.shell_proxy:
self.shell_proxy.clean_working_dir()
To have access to the shell_proxy
, we need to store in in an attribute during the run method:
# store the shell_proxy in the class to be able to use it in the run_after_task method
self.shell_proxy = shell_proxy
Our task is done 💪. Let's see the complete code :
import os
from gws_core import (CondaShellProxy, ConfigParams, File, InputSpec,
MessageDispatcher, OutputSpec, PipShellProxy, Task,
TaskFileDownloader, TaskInputs, TaskOutputs,
task_decorator)
from gws_core.config.param.param_spec import IntParam, StrParam
from gws_core.impl.shell.shell_proxy import ShellProxy
class BlastEnvHelper():
CONDA_ENV_DIR_NAME = "BlastCondaEnv"
PIP_ENV_DIR_NAME = "BlastPipEnv"
CONDA_ENV_FILE_PATH = os.path.join(os.path.abspath(
os.path.dirname(__file__)), "blast_conda.yml")
PIP_ENV_FILE_PATH = os.path.join(os.path.abspath(
os.path.dirname(__file__)), "blast_pipenv.txt")
@classmethod
def create_conda_proxy(cls, message_dispatcher: MessageDispatcher = None) -> CondaShellProxy:
return CondaShellProxy(cls.CONDA_ENV_DIR_NAME, cls.CONDA_ENV_FILE_PATH, message_dispatcher=message_dispatcher)
@classmethod
def create_pip_proxy(cls, message_dispatcher: MessageDispatcher = None) -> PipShellProxy:
return PipShellProxy(cls.PIP_ENV_DIR_NAME, cls.PIP_ENV_FILE_PATH, message_dispatcher=message_dispatcher)
@task_decorator("Blast", human_name="Blast")
class Blast(Task):
input_specs = {'input_file': InputSpec(File, human_name="Compressed fasta file",
short_description="The fasta.gz file to compare with db"),
}
output_specs = {'blast_result': OutputSpec(File, human_name="Blast result file",
short_description="Result file generated by the blast command")}
config_specs = {
'db_path': StrParam(
default_value="https://storage.gra.cloud.ovh.net/v1/AUTH_a0286631d7b24afba3f3cdebed2992aa/opendata/gws_academy/zebrafish.1.protein.faa.gz",
human_name="Database path", short_description="Must be a .faa.gz file"),
'head': IntParam(default_value=0, human_name="Limit input file read",
short_description="Number of line in the input file to compare. 0 for all lines")}
shell_proxy: ShellProxy = None
def run(self, params: ConfigParams, inputs: TaskInputs) -> TaskOutputs:
""" Run the task """
############################ Download the database ############################
# retrieve the db url from the param
db_url: str = params['db_path']
# extract the filename from the url
db_file_name = db_url.split('/')[-1]
# create the file downloader using the current task brick name,
# by passing the brick name of the Task, the file will be downloaded in a specific location for the brick
# also pass the message_dispatcher to log downlod progress in the task messag
file_downloader = TaskFileDownloader(Blast.get_brick_name(), self.message_dispatcher)
# download the db and retrieve the path of the downloaded file
zebra_zipped_db = file_downloader.download_file_if_missing(db_url, db_file_name)
############################ Create the shell proxy ############################
shell_proxy = BlastEnvHelper.create_conda_proxy(
self.message_dispatcher)
# store the shell_proxy in the class to be able to use it in the run_after_task method
self.shell_proxy = shell_proxy
############################ Prepare the DB ############################
zebra_db = "zebra_fish_db.faa"
# Unzip db file in the working directory and verify the result
result = shell_proxy.run([f"gunzip -c {zebra_zipped_db} > {zebra_db}"])
if result != 0:
raise Exception('Error during the unzip of database .gz file')
# Create the blast db in the working directory and check the result
# all the shell command are executed in the virtual environment, so the makeblastdb is available
result = shell_proxy.run([f"makeblastdb -in {zebra_db} -dbtype prot -out {zebra_db}"])
if result != 0:
raise Exception('Error during the creation of the blast db')
############################ Prepare the input file ############################
# retrive the input table
file: File = inputs['input_file']
# Unzip the input file in the working directory and verify the result
input_file_unzipped = "input.faa"
result = shell_proxy.run([f"gunzip -c {file.path} > {input_file_unzipped}"])
if result != 0:
raise Exception('Error during the unzip of .gz file')
# Limit the number of lines to compare if needed
head: int = params['head']
file_to_compare: str = None
# limit the number of lines
if head > 0:
# use the head command to limit the number of lines
sub_file = 'sub_input_file.faa'
result = shell_proxy.run([f"head -n {head} {input_file_unzipped} > {sub_file}"])
if result != 0:
raise Exception('Error during the head command')
file_to_compare = sub_file
else:
# no need to limit the number of line
file_to_compare = input_file_unzipped
############################ Run the blast and retrieve reulst ############################
output_file_name = 'output.txt'
# run the blast
result = shell_proxy.run([f"blastp -query {file_to_compare} -db {zebra_db} -out {output_file_name}"])
if result != 0:
raise Exception('Error during the blast')
# get the absolute path of the output
output_file_path = os.path.join(
shell_proxy.working_dir, output_file_name)
# create the output Resource (File)
output_file = File(output_file_path)
# return the output table
return {'blast_result': output_file}
def run_after_task(self) -> None:
# use to delete the temp folder once the task is done and output resources saved
# this is safe to do it here becase the output resource was move to the Resource location
if self.shell_proxy:
self.shell_proxy.clean_working_dir()
Execute the task
To execute the task, you will need to upload the input data in our lab and configure it.
Here is a link of and example for the input data : https://storage.gra.cloud.ovh.net/v1/AUTH_a0286631d7b24afba3f3cdebed2992aa/opendata/gws_academy/mouse.1.protein.faa.gz
In the config set the db path to : https://storage.gra.cloud.ovh.net/v1/AUTH_a0286631d7b24afba3f3cdebed2992aa/opendata/gws_academy/zebrafish.1.protein.faa.gz
And set the head to 11
as in the Blast tutorial to make the execution faster.