Login
Introduction
Tutorials
Version

More advanced task

In this tutorial we will learn how to create an advanced task to execute a Blast. In this tutorial we will see how we can

  • download file from an external source
  • create a virtual environment
  • prepare the data
  • execute blast in the virtual environment
  • clean data after the run


In this tutorial we will create a task that does the same as the following tutorial in Constellab environment : https://angus.readthedocs.io/en/2019/running-command-line-blast.html#running-blast

Create the task

First create a Task named Blast and define input, output and config as following :

@task_decorator("Blast", human_name="Blast")
class Blast(Task):

    input_specs = {'input_file': InputSpec(File, human_name="Compressed fasta file",
                                     short_description="The fasta.gz file to compare with db"),
                   }
    output_specs = {'blast_result': OutputSpec(File, human_name="Blast result file",
                                       short_description="Result file generated by the blast command")}

    config_specs = {
        'db_path': StrParam(               default_value="https://storage.gra.cloud.ovh.net/v1/AUTH_a0286631d7b24afba3f3cdebed2992aa/opendata/gws_academy/zebrafish.1.protein.faa.gz",
            human_name="Database path", short_description="Must be a .faa.gz file"),
        'head': IntParam(default_value=0, human_name="Limit input file read",
                         short_description="Number of line in the input file to compare. 0 for all lines")}

This task takes a compressed fasta file as input returns the blast output as result. It has 2 configurations :

  • db_path: path of the db to download to compares sequences
  • head: use to limite the input size to faster the blast.


Download the database

To make the Task flexible we provided a config to download the database form an external source. To download the database, we will use the TaskFileDownloader. This simplify the download of external source and can detect if the file was already downloaded to prevent re-downloading it. Here is the documentation about this : https://constellab.community/bricks/gws_core/latest/doc/developer-guide/task/task#use-external-data
Here is the code to download the database

# Download the database

# retrieve the db url from the param
db_url: str = params['db_path']

# extract the filename from the url 
db_file_name = db_url.split('/')[-1]

# create the file downloader using the current task brick name, 
# by passing the brick name of the Task, the file will be downloaded in a specific location for the brick
# also pass the message_dispatcher to log downlod progress in the task messag
file_downloader = TaskFileDownloader(Blast.get_brick_name(), self.message_dispatcher)

# download the db and retrieve the path of the downloaded file
zebra_zipped_db = file_downloader.download_file_if_missing(db_url, db_file_name) 

After this code, the downloaded db will be available and the path of the file available in zebra_zipped_db.
If we ré-execute this task, the database will not be downloaded again if the db_file_name didn't change.

Create the virtual environment

To run the blast we will need the blastp command line . We will install it in a Conda virtual environment so this will not interfere with current environment. Here is the documentation about virtual environment : https://constellab.community/bricks/gws_core/latest/doc/developer-guide/virtual-environment-and-command-line
To create the environment we will use the CondaShellProxy class. We will delegate the env creation to an external class of our Task named BlastEnvHelper. Having a external class to manage the virtual environment is a good practice as multiple Task can use the same environment.
Here is the BlastEnvHelper class

class BlastEnvHelper():
   # define the name of the virtual environment, it must be unique
    CONDA_ENV_DIR_NAME = "BlastCondaEnv"

   # path of the yaml environment file. The file blast_conda.yml must be in the same folder as this file
    CONDA_ENV_FILE_PATH = os.path.join(os.path.abspath(
        os.path.dirname(__file__)), "blast_conda.yml")

    # method to create the conda shell proxy. 
    # we pass the MessageDispatcher so that the output of the command line will be logged in the Task
    @classmethod
    def create_conda_proxy(cls, message_dispatcher: MessageDispatcher = None) -> CondaShellProxy:
        return CondaShellProxy(cls.CONDA_ENV_DIR_NAME, cls.CONDA_ENV_FILE_PATH, message_dispatcher=message_dispatcher)

This required a yaml environment file, named blast_conda.yml in the same folder as the above file. Here is the content of the conda environment file which defined the required package to execute the blast.

channels:
  - bioconda
  - conda-forge
dependencies:
  - python=3.8
  - biopython
  - blast==2.13.0

Now we need to create the ShellProxy in our Task to be able to run command in the virtual environment :

shell_proxy = BlastEnvHelper.create_conda_proxy( self.message_dispatcher)

We provide the self.message_dispatcher so output of the commands executed by the proxy will be logged in the Task.

Prepare the data

Now we will follow the blast tutorial to prepare the data to run the blast. We will need to :

  1. Unzip the database and create the blast database from the unzipped file
  2. Retrieve the input file, unzip it, get the first X lines based on the config and move the result in the working directory

Let's see the code :

############################ Prepare the DB ############################
zebra_db = "zebra_fish_db.faa"
# Unzip db file in the working directory and verify the result
result = shell_proxy.run([f"gunzip -c {zebra_zipped_db} > {zebra_db}"])
if result != 0:
    raise Exception('Error during the unzip of database .gz file')


# Create the blast db in the working directory and check the result
# all the shell command are executed in the virtual environment, so the makeblastdb is available
result = shell_proxy.run([f"makeblastdb -in {zebra_db} -dbtype prot -out {zebra_db}"])
if result != 0:
    raise Exception('Error during the creation of the blast db')

############################ Prepare the input file ############################
# retrive the input table
file: File = inputs['input_file']

# Unzip the input file in the working directory and verify the result
input_file_unzipped = "input.faa"
result = shell_proxy.run([f"gunzip -c {file.path} > {input_file_unzipped}"])
if result != 0:
    raise Exception('Error during the unzip of .gz file')

# Limit the number of lines to compare if needed
head: int = params['head']

file_to_compare: str = None
# limit the number of lines
if head > 0:
    # use the head command to limit the number of lines
    sub_file = 'sub_input_file.faa'
    result = shell_proxy.run([f"head -n {head} {input_file_unzipped} > {sub_file}"])

    if result != 0:
        raise Exception('Error during the head command')
    file_to_compare = sub_file
else:
    # no need to limit the number of line
    file_to_compare = input_file_unzipped

After this code we will have the Db ready in the working directory, named zebra_db and our input file ready in the working directory named file_to_compare.
Note: all the command are run with the shell_proxy. This means that all commands are executed in the same working, this is why we use relative file path.
Note: all the command are run with the shell_proxy. This means that all commands are executed in the virtual environment, this is why we can use the makeblastdb command.

Execute the blast

Now that everything is ready, this is the ease part, we need to execute the blast and return the result.

############################ Run the blast and retrieve reulst ############################
output_file_name = 'output.txt'
# run the blast
result = shell_proxy.run([f"blastp -query {file_to_compare} -db {zebra_db} -out {output_file_name}"])
if result != 0:
    raise Exception('Error during the blast')

# get the absolute path of the output
output_file_path = os.path.join(shell_proxy.working_dir, output_file_name)
# create the output Resource (File)
output_file = File(output_file_path)

# return the output table
return {'blast_result': output_file}

Note: the result the path of a file. We need to create a Resource if we want to save it, visualize it and maybe use it in another Task. We use the File resource and path the absolute path of the result.

Clean the data after the run

As a lot of files were creating during this process (unzip and database creation), it is a good practice to clear the working directory after the run of the task.
To do this, we will implement the run_after_task method of the Task. Here is the documentation about it : https://constellab.community/bricks/gws_core/latest/doc/developer-guide/task/task

def run_after_task(self) -> None:
        # use to delete the temp folder once the task is done and output resources saved
        # this is safe to do it here becase the output resource was move to the Resource location
        if self.shell_proxy:
            self.shell_proxy.clean_working_dir()

To have access to the shell_proxy, we need to store in in an attribute during the run method:

# store the shell_proxy in the class to be able to use it in the run_after_task method
        self.shell_proxy = shell_proxy

Note: we can't clear the data in the run method because the output resource and not saved yet. It is safe to do it in the run_after_task method because the ouput resource was already saved and moved to another location.
Our task is done 💪. Let's see the complete code :

import os

from gws_core import (CondaShellProxy, ConfigParams, File, InputSpec,
                      MessageDispatcher, OutputSpec, PipShellProxy, Task,
                      TaskFileDownloader, TaskInputs, TaskOutputs,
                      task_decorator)
from gws_core.config.param.param_spec import IntParam, StrParam
from gws_core.impl.shell.shell_proxy import ShellProxy

class BlastEnvHelper():
    CONDA_ENV_DIR_NAME = "BlastCondaEnv"
    PIP_ENV_DIR_NAME = "BlastPipEnv"
    CONDA_ENV_FILE_PATH = os.path.join(os.path.abspath(
        os.path.dirname(__file__)), "blast_conda.yml")
    PIP_ENV_FILE_PATH = os.path.join(os.path.abspath(
        os.path.dirname(__file__)), "blast_pipenv.txt")

    @classmethod
    def create_conda_proxy(cls, message_dispatcher: MessageDispatcher = None) -> CondaShellProxy:
        return CondaShellProxy(cls.CONDA_ENV_DIR_NAME, cls.CONDA_ENV_FILE_PATH, message_dispatcher=message_dispatcher)

    @classmethod
    def create_pip_proxy(cls, message_dispatcher: MessageDispatcher = None) -> PipShellProxy:
        return PipShellProxy(cls.PIP_ENV_DIR_NAME, cls.PIP_ENV_FILE_PATH, message_dispatcher=message_dispatcher)

@task_decorator("Blast", human_name="Blast")
class Blast(Task):

    input_specs = {'input_file': InputSpec(File, human_name="Compressed fasta file",
                                           short_description="The fasta.gz file to compare with db"),
                   }
    output_specs = {'blast_result': OutputSpec(File, human_name="Blast result file",
                                               short_description="Result file generated by the blast command")}

    config_specs = {
        'db_path': StrParam(
            default_value="https://storage.gra.cloud.ovh.net/v1/AUTH_a0286631d7b24afba3f3cdebed2992aa/opendata/gws_academy/zebrafish.1.protein.faa.gz",
            human_name="Database path", short_description="Must be a .faa.gz file"),
        'head': IntParam(default_value=0, human_name="Limit input file read",
                         short_description="Number of line in the input file to compare. 0 for all lines")}

    shell_proxy: ShellProxy = None

    def run(self, params: ConfigParams, inputs: TaskInputs) -> TaskOutputs:
        """ Run the task """


        ############################ Download the database ############################
        # retrieve the db url from the param
        db_url: str = params['db_path']

        # extract the filename from the url
        db_file_name = db_url.split('/')[-1]

        # create the file downloader using the current task brick name,
        # by passing the brick name of the Task, the file will be downloaded in a specific location for the brick
        # also pass the message_dispatcher to log downlod progress in the task messag
        file_downloader = TaskFileDownloader(Blast.get_brick_name(), self.message_dispatcher)

        # download the db and retrieve the path of the downloaded file
        zebra_zipped_db = file_downloader.download_file_if_missing(db_url, db_file_name)

        ############################ Create the shell proxy ############################
        shell_proxy = BlastEnvHelper.create_conda_proxy(
            self.message_dispatcher)
        # store the shell_proxy in the class to be able to use it in the run_after_task method
        self.shell_proxy = shell_proxy

        ############################ Prepare the DB ############################
        zebra_db = "zebra_fish_db.faa"
        # Unzip db file in the working directory and verify the result
        result = shell_proxy.run([f"gunzip -c {zebra_zipped_db} > {zebra_db}"])
        if result != 0:
            raise Exception('Error during the unzip of database .gz file')

        # Create the blast db in the working directory and check the result
        # all the shell command are executed in the virtual environment, so the makeblastdb is available
        result = shell_proxy.run([f"makeblastdb -in {zebra_db} -dbtype prot -out {zebra_db}"])
        if result != 0:
            raise Exception('Error during the creation of the blast db')

        ############################ Prepare the input file ############################
        # retrive the input table
        file: File = inputs['input_file']

        # Unzip the input file in the working directory and verify the result
        input_file_unzipped = "input.faa"
        result = shell_proxy.run([f"gunzip -c {file.path} > {input_file_unzipped}"])
        if result != 0:
            raise Exception('Error during the unzip of .gz file')

        # Limit the number of lines to compare if needed
        head: int = params['head']

        file_to_compare: str = None
        # limit the number of lines
        if head > 0:
            # use the head command to limit the number of lines
            sub_file = 'sub_input_file.faa'
            result  = shell_proxy.run([f"head -n {head} {input_file_unzipped} > {sub_file}"])

            if result != 0:
                raise Exception('Error during the head command')
            file_to_compare = sub_file
        else:
            # no need to limit the number of line
            file_to_compare = input_file_unzipped

        ############################ Run the blast and retrieve reulst ############################
        output_file_name = 'output.txt'
        # run the blast
        result = shell_proxy.run([f"blastp -query {file_to_compare} -db {zebra_db} -out {output_file_name}"])

        if result != 0:
            raise Exception('Error during the blast')

        # get the absolute path of the output
        output_file_path = os.path.join(
            shell_proxy.working_dir, output_file_name)
        # create the output Resource (File)
        output_file = File(output_file_path)

        # return the output table
        return {'blast_result': output_file}

    def run_after_task(self) -> None:
        # use to delete the temp folder once the task is done and output resources saved
        # this is safe to do it here becase the output resource was move to the Resource location
        if self.shell_proxy:
            self.shell_proxy.clean_working_dir()

Execute the task

To execute the task, you will need to upload the input data in our lab and configure it.
Here is a link of and example for the input data : https://storage.gra.cloud.ovh.net/v1/AUTH_a0286631d7b24afba3f3cdebed2992aa/opendata/gws_academy/mouse.1.protein.faa.gz
In the config set the db path to : https://storage.gra.cloud.ovh.net/v1/AUTH_a0286631d7b24afba3f3cdebed2992aa/opendata/gws_academy/zebrafish.1.protein.faa.gz
And set the head to 11 as in the Blast tutorial to make the execution faster.
The first execution of the Task might take a while as this will install the virtual environment. Once the virtual environment is installed, it will be re-used.