In this tutorial we will learn how to create an advanced task to execute a Blast. In this tutorial we will see how we can
- download file from an external source
- create a virtual environment
- prepare the data
- execute blast in the virtual environment
- clean data after the run
In this tutorial we will create a task that does the same as the following tutorial in Constellab environment : https://angus.readthedocs.io/en/2019/running-command-line-blast.html#running-blast
Create the task
First create a Task named Blast and define input, output and config as following :
@task_decorator("Blast", human_name="Blast")
class Blast(Task):
input_specs = {'input_file': InputSpec(File, human_name="Compressed fasta file",
short_description="The fasta.gz file to compare with db"),
}
output_specs = {'blast_result': OutputSpec(File, human_name="Blast result file",
short_description="Result file generated by the blast command")}
config_specs = {
'db_path': StrParam( default_value="https://storage.gra.cloud.ovh.net/v1/AUTH_a0286631d7b24afba3f3cdebed2992aa/opendata/gws_academy/zebrafish.1.protein.faa.gz",
human_name="Database path", short_description="Must be a .faa.gz file"),
'head': IntParam(default_value=0, human_name="Limit input file read",
short_description="Number of line in the input file to compare. 0 for all lines")}
This task takes a compressed fasta file as input returns the blast output as result. It has 2 configurations :
db_path
: path of the db to download to compares sequenceshead
: use to limite the input size to faster the blast.
Download the database
To make the Task
flexible we provided a config to download the database form an external source. To download the database, we will use the TaskFileDownloader
. This simplify the download of external source and can detect if the file was already downloaded to prevent re-downloading it. Here is the documentation about this : https://constellab.community/bricks/gws_core/latest/doc/developer-guide/task/task#use-external-data
Here is the code to download the database
# Download the database
# retrieve the db url from the param
db_url: str = params['db_path']
# extract the filename from the url
db_file_name = db_url.split('/')[-1]
# create the file downloader using the current task brick name,
# by passing the brick name of the Task, the file will be downloaded in a specific location for the brick
# also pass the message_dispatcher to log downlod progress in the task messag
file_downloader = TaskFileDownloader(Blast.get_brick_name(), self.message_dispatcher)
# download the db and retrieve the path of the downloaded file
zebra_zipped_db = file_downloader.download_file_if_missing(db_url, db_file_name)
After this code, the downloaded db will be available and the path of the file available in zebra_zipped_db
.
If we ré-execute this task, the database will not be downloaded again if the db_file_name
didn't change.
Create the virtual environment
To run the blast we will need the blastp
command line . We will install it in a Conda virtual environment so this will not interfere with current environment. Here is the documentation about virtual environment : https://constellab.community/bricks/gws_core/latest/doc/developer-guide/virtual-environment-and-command-line
To create the environment we will use the CondaShellProxy
class. We will delegate the env creation to an external class of our Task
named BlastEnvHelper
. Having a external class to manage the virtual environment is a good practice as multiple Task
can use the same environment.
Here is the BlastEnvHelper
class
class BlastEnvHelper():
# define the name of the virtual environment, it must be unique
CONDA_ENV_DIR_NAME = "BlastCondaEnv"
# path of the yaml environment file. The file blast_conda.yml must be in the same folder as this file
CONDA_ENV_FILE_PATH = os.path.join(os.path.abspath(
os.path.dirname(__file__)), "blast_conda.yml")
# method to create the conda shell proxy.
# we pass the MessageDispatcher so that the output of the command line will be logged in the Task
@classmethod
def create_conda_proxy(cls, message_dispatcher: MessageDispatcher = None) -> CondaShellProxy:
return CondaShellProxy(cls.CONDA_ENV_DIR_NAME, cls.CONDA_ENV_FILE_PATH, message_dispatcher=message_dispatcher)
This required a yaml
environment file, named blast_conda.yml
in the same folder as the above file. Here is the content of the conda environment file which defined the required package to execute the blast.
channels:
- bioconda
- conda-forge
dependencies:
- python=3.8
- biopython
- blast==2.13.0
Now we need to create the ShellProxy
in our Task
to be able to run command in the virtual environment :
shell_proxy = BlastEnvHelper.create_conda_proxy( self.message_dispatcher)
We provide the self.message_dispatcher
so output of the commands executed by the proxy will be logged in the Task
.
Prepare the data
Now we will follow the blast tutorial to prepare the data to run the blast. We will need to :
- Unzip the database and create the blast database from the unzipped file
- Retrieve the input file, unzip it, get the first X lines based on the config and move the result in the working directory
Let's see the code :
############################ Prepare the DB ############################
zebra_db = "zebra_fish_db.faa"
# Unzip db file in the working directory and verify the result
result = shell_proxy.run([f"gunzip -c {zebra_zipped_db} > {zebra_db}"])
if result != 0:
raise Exception('Error during the unzip of database .gz file')
# Create the blast db in the working directory and check the result
# all the shell command are executed in the virtual environment, so the makeblastdb is available
result = shell_proxy.run([f"makeblastdb -in {zebra_db} -dbtype prot -out {zebra_db}"])
if result != 0:
raise Exception('Error during the creation of the blast db')
############################ Prepare the input file ############################
# retrive the input table
file: File = inputs['input_file']
# Unzip the input file in the working directory and verify the result
input_file_unzipped = "input.faa"
result = shell_proxy.run([f"gunzip -c {file.path} > {input_file_unzipped}"])
if result != 0:
raise Exception('Error during the unzip of .gz file')
# Limit the number of lines to compare if needed
head: int = params['head']
file_to_compare: str = None
# limit the number of lines
if head > 0:
# use the head command to limit the number of lines
sub_file = 'sub_input_file.faa'
result = shell_proxy.run([f"head -n {head} {input_file_unzipped} > {sub_file}"])
if result != 0:
raise Exception('Error during the head command')
file_to_compare = sub_file
else:
# no need to limit the number of line
file_to_compare = input_file_unzipped
After this code we will have the Db ready in the working directory, named zebra_db
and our input file ready in the working directory named file_to_compare
.
Note: all the command are run with the shell_proxy
. This means that all commands are executed in the same working, this is why we use relative file path.
Note: all the command are run with the shell_proxy
. This means that all commands are executed in the virtual environment, this is why we can use the makeblastdb
command.
Execute the blast
Now that everything is ready, this is the ease part, we need to execute the blast and return the result.
############################ Run the blast and retrieve reulst ############################
output_file_name = 'output.txt'
# run the blast
result = shell_proxy.run([f"blastp -query {file_to_compare} -db {zebra_db} -out {output_file_name}"])
if result != 0:
raise Exception('Error during the blast')
# get the absolute path of the output
output_file_path = os.path.join(shell_proxy.working_dir, output_file_name)
# create the output Resource (File)
output_file = File(output_file_path)
# return the output table
return {'blast_result': output_file}
Note: the result the path of a file. We need to create a Resource
if we want to save it, visualize it and maybe use it in another Task
. We use the File
resource and path the absolute path of the result.
Clean the data after the run
As a lot of files were creating during this process (unzip and database creation), it is a good practice to clear the working directory after the run of the task.
To do this, we will implement the run_after_task
method of the Task. Here is the documentation about it : https://constellab.community/bricks/gws_core/latest/doc/developer-guide/task/task
def run_after_task(self) -> None:
# use to delete the temp folder once the task is done and output resources saved
# this is safe to do it here becase the output resource was move to the Resource location
if self.shell_proxy:
self.shell_proxy.clean_working_dir()
To have access to the shell_proxy
, we need to store in in an attribute during the run method:
# store the shell_proxy in the class to be able to use it in the run_after_task method
self.shell_proxy = shell_proxy
Note: we can't clear the data in the run
method because the output resource and not saved yet. It is safe to do it in the run_after_task
method because the ouput resource was already saved and moved to another location.
Our task is done 💪. Let's see the complete code :
import os
from gws_core import (CondaShellProxy, ConfigParams, File, InputSpec,
MessageDispatcher, OutputSpec, PipShellProxy, Task,
TaskFileDownloader, TaskInputs, TaskOutputs,
task_decorator)
from gws_core.config.param.param_spec import IntParam, StrParam
from gws_core.impl.shell.shell_proxy import ShellProxy
class BlastEnvHelper():
CONDA_ENV_DIR_NAME = "BlastCondaEnv"
PIP_ENV_DIR_NAME = "BlastPipEnv"
CONDA_ENV_FILE_PATH = os.path.join(os.path.abspath(
os.path.dirname(__file__)), "blast_conda.yml")
PIP_ENV_FILE_PATH = os.path.join(os.path.abspath(
os.path.dirname(__file__)), "blast_pipenv.txt")
@classmethod
def create_conda_proxy(cls, message_dispatcher: MessageDispatcher = None) -> CondaShellProxy:
return CondaShellProxy(cls.CONDA_ENV_DIR_NAME, cls.CONDA_ENV_FILE_PATH, message_dispatcher=message_dispatcher)
@classmethod
def create_pip_proxy(cls, message_dispatcher: MessageDispatcher = None) -> PipShellProxy:
return PipShellProxy(cls.PIP_ENV_DIR_NAME, cls.PIP_ENV_FILE_PATH, message_dispatcher=message_dispatcher)
@task_decorator("Blast", human_name="Blast")
class Blast(Task):
input_specs = {'input_file': InputSpec(File, human_name="Compressed fasta file",
short_description="The fasta.gz file to compare with db"),
}
output_specs = {'blast_result': OutputSpec(File, human_name="Blast result file",
short_description="Result file generated by the blast command")}
config_specs = {
'db_path': StrParam(
default_value="https://storage.gra.cloud.ovh.net/v1/AUTH_a0286631d7b24afba3f3cdebed2992aa/opendata/gws_academy/zebrafish.1.protein.faa.gz",
human_name="Database path", short_description="Must be a .faa.gz file"),
'head': IntParam(default_value=0, human_name="Limit input file read",
short_description="Number of line in the input file to compare. 0 for all lines")}
shell_proxy: ShellProxy = None
def run(self, params: ConfigParams, inputs: TaskInputs) -> TaskOutputs:
""" Run the task """
############################ Download the database ############################
# retrieve the db url from the param
db_url: str = params['db_path']
# extract the filename from the url
db_file_name = db_url.split('/')[-1]
# create the file downloader using the current task brick name,
# by passing the brick name of the Task, the file will be downloaded in a specific location for the brick
# also pass the message_dispatcher to log downlod progress in the task messag
file_downloader = TaskFileDownloader(Blast.get_brick_name(), self.message_dispatcher)
# download the db and retrieve the path of the downloaded file
zebra_zipped_db = file_downloader.download_file_if_missing(db_url, db_file_name)
############################ Create the shell proxy ############################
shell_proxy = BlastEnvHelper.create_conda_proxy(
self.message_dispatcher)
# store the shell_proxy in the class to be able to use it in the run_after_task method
self.shell_proxy = shell_proxy
############################ Prepare the DB ############################
zebra_db = "zebra_fish_db.faa"
# Unzip db file in the working directory and verify the result
result = shell_proxy.run([f"gunzip -c {zebra_zipped_db} > {zebra_db}"])
if result != 0:
raise Exception('Error during the unzip of database .gz file')
# Create the blast db in the working directory and check the result
# all the shell command are executed in the virtual environment, so the makeblastdb is available
result = shell_proxy.run([f"makeblastdb -in {zebra_db} -dbtype prot -out {zebra_db}"])
if result != 0:
raise Exception('Error during the creation of the blast db')
############################ Prepare the input file ############################
# retrive the input table
file: File = inputs['input_file']
# Unzip the input file in the working directory and verify the result
input_file_unzipped = "input.faa"
result = shell_proxy.run([f"gunzip -c {file.path} > {input_file_unzipped}"])
if result != 0:
raise Exception('Error during the unzip of .gz file')
# Limit the number of lines to compare if needed
head: int = params['head']
file_to_compare: str = None
# limit the number of lines
if head > 0:
# use the head command to limit the number of lines
sub_file = 'sub_input_file.faa'
result = shell_proxy.run([f"head -n {head} {input_file_unzipped} > {sub_file}"])
if result != 0:
raise Exception('Error during the head command')
file_to_compare = sub_file
else:
# no need to limit the number of line
file_to_compare = input_file_unzipped
############################ Run the blast and retrieve reulst ############################
output_file_name = 'output.txt'
# run the blast
result = shell_proxy.run([f"blastp -query {file_to_compare} -db {zebra_db} -out {output_file_name}"])
if result != 0:
raise Exception('Error during the blast')
# get the absolute path of the output
output_file_path = os.path.join(
shell_proxy.working_dir, output_file_name)
# create the output Resource (File)
output_file = File(output_file_path)
# return the output table
return {'blast_result': output_file}
def run_after_task(self) -> None:
# use to delete the temp folder once the task is done and output resources saved
# this is safe to do it here becase the output resource was move to the Resource location
if self.shell_proxy:
self.shell_proxy.clean_working_dir()
Execute the task
To execute the task, you will need to upload the input data in our lab and configure it.
Here is a link of and example for the input data : https://storage.gra.cloud.ovh.net/v1/AUTH_a0286631d7b24afba3f3cdebed2992aa/opendata/gws_academy/mouse.1.protein.faa.gz
In the config set the db path to : https://storage.gra.cloud.ovh.net/v1/AUTH_a0286631d7b24afba3f3cdebed2992aa/opendata/gws_academy/zebrafish.1.protein.faa.gz
And set the head to 11
as in the Blast tutorial to make the execution faster.
The first execution of the Task might take a while as this will install the virtual environment. Once the virtual environment is installed, it will be re-used.