Login
Back to bricks list
Introduction Version

More advanced task

In this tutorial we will learn how to create an advanced task to execute a Blast. In this tutorial we will see how we can


  • download file from an external source
    • create a virtual environment
      • prepare the data
        • execute blast in the virtual environment
          • clean data after the run


            In this tutorial we will create a task that does the same as the following tutorial in Constellab environment : https://angus.readthedocs.io/en/2019/running-command-line-blast.html#running-blast



            Create the task


            First create a Task named Blast and define input, output and config as following :


            @task_decorator("Blast", human_name="Blast")
            class Blast(Task):
                input_specs = {'input_file': InputSpec(File, human_name="Compressed fasta file",
                                                 short_description="The fasta.gz file to compare with db"),
                               }
                output_specs = {'blast_result': OutputSpec(File, human_name="Blast result file",
                                                   short_description="Result file generated by the blast command")}
                config_specs = {
                    'db_path': StrParam(               default_value="https://storage.gra.cloud.ovh.net/v1/AUTH_a0286631d7b24afba3f3cdebed2992aa/opendata/gws_academy/zebrafish.1.protein.faa.gz",
                        human_name="Database path", short_description="Must be a .faa.gz file"),
                    'head': IntParam(default_value=0, human_name="Limit input file read",
                                     short_description="Number of line in the input file to compare. 0 for all lines")}
            

            This task takes a compressed fasta file as input returns the blast output as result. It has 2 configurations :


            • db_path: path of the db to download to compares sequences
              • head: use to limite the input size to faster the blast.


                Download the database


                To make the Task flexible we provided a config to download the database form an external source. To download the database, we will use the TaskFileDownloader. This simplify the download of external source and can detect if the file was already downloaded to prevent re-downloading it. Here is the documentation about this : Use external data


                Here is the code to download the database


                # Download the database
                # retrieve the db url from the param
                db_url: str = params['db_path']
                # extract the filename from the url 
                db_file_name = db_url.split('/')[-1]
                # create the file downloader using the current task brick name, 
                # by passing the brick name of the Task, the file will be downloaded in a specific location for the brick
                # also pass the message_dispatcher to log downlod progress in the task messag
                file_downloader = TaskFileDownloader(Blast.get_brick_name(), self.message_dispatcher)
                # download the db and retrieve the path of the downloaded file
                zebra_zipped_db = file_downloader.download_file_if_missing(db_url, db_file_name) 
                

                After this code, the downloaded db will be available and the path of the file available in zebra_zipped_db.




                Create the virtual environment


                To run the blast we will need the blastp command line . We will install it in a Conda virtual environment so this will not interfere with current environment. Here is the documentation about virtual environment : Virtual environment and command line.


                To create the environment we will use the CondaShellProxy class. We will delegate the env creation to an external class of our Task named BlastEnvHelper. Having a external class to manage the virtual environment is a good practice as multiple Task can use the same environment.


                Here is the BlastEnvHelper class


                class BlastEnvHelper():
                   # define the name of the virtual environment, it must be unique
                    CONDA_ENV_DIR_NAME = "BlastCondaEnv"
                   # path of the yaml environment file. The file blast_conda.yml must be in the same folder as this file
                    CONDA_ENV_FILE_PATH = os.path.join(os.path.abspath(
                        os.path.dirname(__file__)), "blast_conda.yml")
                    # method to create the conda shell proxy. 
                    # we pass the MessageDispatcher so that the output of the command line will be logged in the Task
                    @classmethod
                    def create_conda_proxy(cls, message_dispatcher: MessageDispatcher = None) -> CondaShellProxy:
                        return CondaShellProxy(cls.CONDA_ENV_DIR_NAME, cls.CONDA_ENV_FILE_PATH, message_dispatcher=message_dispatcher)
                

                This required a yaml environment file, named blast_conda.yml in the same folder as the above file. Here is the content of the conda environment file which defined the required package to execute the blast.


                channels:
                  - bioconda
                  - conda-forge
                dependencies:
                  - python=3.8
                  - biopython
                  - blast==2.13.0
                

                Now we need to create the ShellProxy in our Task to be able to run command in the virtual environment :


                shell_proxy = BlastEnvHelper.create_conda_proxy( self.message_dispatcher)
                


                Prepare the data


                Now we will follow the blast tutorial to prepare the data to run the blast. We will need to :


                1. Unzip the database and create the blast database from the unzipped file
                  1. Retrieve the input file, unzip it, get the first X lines based on the config and move the result in the working directory

                    Let's see the code :


                    ############################ Prepare the DB ############################
                    zebra_db = "zebra_fish_db.faa"
                    # Unzip db file in the working directory and verify the result
                    result = shell_proxy.run([f"gunzip -c {zebra_zipped_db} > {zebra_db}"])
                    if result != 0:
                        raise Exception('Error during the unzip of database .gz file')
                    # Create the blast db in the working directory and check the result
                    # all the shell command are executed in the virtual environment, so the makeblastdb is available
                    result = shell_proxy.run([f"makeblastdb -in {zebra_db} -dbtype prot -out {zebra_db}"])
                    if result != 0:
                        raise Exception('Error during the creation of the blast db')
                    ############################ Prepare the input file ############################
                    # retrive the input table
                    file: File = inputs['input_file']
                    # Unzip the input file in the working directory and verify the result
                    input_file_unzipped = "input.faa"
                    result = shell_proxy.run([f"gunzip -c {file.path} > {input_file_unzipped}"])
                    if result != 0:
                        raise Exception('Error during the unzip of .gz file')
                    # Limit the number of lines to compare if needed
                    head: int = params['head']
                    file_to_compare: str = None
                    # limit the number of lines
                    if head > 0:
                        # use the head command to limit the number of lines
                        sub_file = 'sub_input_file.faa'
                        result = shell_proxy.run([f"head -n {head} {input_file_unzipped} > {sub_file}"])
                        if result != 0:
                            raise Exception('Error during the head command')
                        file_to_compare = sub_file
                    else:
                        # no need to limit the number of line
                        file_to_compare = input_file_unzipped
                    

                    After this code we will have the Db ready in the working directory, named zebra_db and our input file ready in the working directory named file_to_compare.




                    Execute the blast


                    Now that everything is ready, this is the ease part, we need to execute the blast and return the result.


                    ############################ Run the blast and retrieve reulst ############################
                    output_file_name = 'output.txt'
                    # run the blast
                    result = shell_proxy.run([f"blastp -query {file_to_compare} -db {zebra_db} -out {output_file_name}"])
                    if result != 0:
                        raise Exception('Error during the blast')
                    # get the absolute path of the output
                    output_file_path = os.path.join(shell_proxy.working_dir, output_file_name)
                    # create the output Resource (File)
                    output_file = File(output_file_path)
                    # return the output table
                    return {'blast_result': output_file}
                    


                    Clean the data after the run


                    As a lot of files were creating during this process (unzip and database creation), it is a good practice to clear the working directory after the run of the task.


                    To do this, we will implement the run_after_task method of the Task. Here is the documentation about it : Run after task


                    def run_after_task(self) -> None:
                            # use to delete the temp folder once the task is done and output resources saved
                            # this is safe to do it here becase the output resource was move to the Resource location
                            if self.shell_proxy:
                                self.shell_proxy.clean_working_dir()
                    

                    To have access to the shell_proxy, we need to store in in an attribute during the run method:


                    # store the shell_proxy in the class to be able to use it in the run_after_task method
                            self.shell_proxy = shell_proxy
                    


                    Our task is done 💪. Let's see the complete code :


                    import os
                    from gws_core import (CondaShellProxy, ConfigParams, File, InputSpec,
                                          MessageDispatcher, OutputSpec, PipShellProxy, Task,
                                          TaskFileDownloader, TaskInputs, TaskOutputs,
                                          task_decorator)
                    from gws_core.config.param.param_spec import IntParam, StrParam
                    from gws_core.impl.shell.shell_proxy import ShellProxy
                    class BlastEnvHelper():
                        CONDA_ENV_DIR_NAME = "BlastCondaEnv"
                        PIP_ENV_DIR_NAME = "BlastPipEnv"
                        CONDA_ENV_FILE_PATH = os.path.join(os.path.abspath(
                            os.path.dirname(__file__)), "blast_conda.yml")
                        PIP_ENV_FILE_PATH = os.path.join(os.path.abspath(
                            os.path.dirname(__file__)), "blast_pipenv.txt")
                        
                        @classmethod
                        def create_conda_proxy(cls, message_dispatcher: MessageDispatcher = None) -> CondaShellProxy:
                            return CondaShellProxy(cls.CONDA_ENV_DIR_NAME, cls.CONDA_ENV_FILE_PATH, message_dispatcher=message_dispatcher)
                        
                        @classmethod
                        def create_pip_proxy(cls, message_dispatcher: MessageDispatcher = None) -> PipShellProxy:
                            return PipShellProxy(cls.PIP_ENV_DIR_NAME, cls.PIP_ENV_FILE_PATH, message_dispatcher=message_dispatcher)
                    
                    
                    @task_decorator("Blast", human_name="Blast")
                    class Blast(Task):
                        input_specs = {'input_file': InputSpec(File, human_name="Compressed fasta file",
                                                               short_description="The fasta.gz file to compare with db"),
                                       }
                        output_specs = {'blast_result': OutputSpec(File, human_name="Blast result file",
                                                                   short_description="Result file generated by the blast command")}
                        config_specs = {
                            'db_path': StrParam(
                                default_value="https://storage.gra.cloud.ovh.net/v1/AUTH_a0286631d7b24afba3f3cdebed2992aa/opendata/gws_academy/zebrafish.1.protein.faa.gz",
                                human_name="Database path", short_description="Must be a .faa.gz file"),
                            'head': IntParam(default_value=0, human_name="Limit input file read",
                                             short_description="Number of line in the input file to compare. 0 for all lines")}
                        shell_proxy: ShellProxy = None
                        def run(self, params: ConfigParams, inputs: TaskInputs) -> TaskOutputs:
                            """ Run the task """
                            ############################ Download the database ############################
                            # retrieve the db url from the param
                            db_url: str = params['db_path']
                            # extract the filename from the url
                            db_file_name = db_url.split('/')[-1]
                            # create the file downloader using the current task brick name,
                            # by passing the brick name of the Task, the file will be downloaded in a specific location for the brick
                            # also pass the message_dispatcher to log downlod progress in the task messag
                            file_downloader = TaskFileDownloader(Blast.get_brick_name(), self.message_dispatcher)
                            # download the db and retrieve the path of the downloaded file
                            zebra_zipped_db = file_downloader.download_file_if_missing(db_url, db_file_name)
                            ############################ Create the shell proxy ############################
                            shell_proxy = BlastEnvHelper.create_conda_proxy(
                                self.message_dispatcher)
                            # store the shell_proxy in the class to be able to use it in the run_after_task method
                            self.shell_proxy = shell_proxy
                            ############################ Prepare the DB ############################
                            zebra_db = "zebra_fish_db.faa"
                            # Unzip db file in the working directory and verify the result
                            result = shell_proxy.run([f"gunzip -c {zebra_zipped_db} > {zebra_db}"])
                            if result != 0:
                                raise Exception('Error during the unzip of database .gz file')
                            # Create the blast db in the working directory and check the result
                            # all the shell command are executed in the virtual environment, so the makeblastdb is available
                            result = shell_proxy.run([f"makeblastdb -in {zebra_db} -dbtype prot -out {zebra_db}"])
                            if result != 0:
                                raise Exception('Error during the creation of the blast db')
                            ############################ Prepare the input file ############################
                            # retrive the input table
                            file: File = inputs['input_file']
                            # Unzip the input file in the working directory and verify the result
                            input_file_unzipped = "input.faa"
                            result = shell_proxy.run([f"gunzip -c {file.path} > {input_file_unzipped}"])
                            if result != 0:
                                raise Exception('Error during the unzip of .gz file')
                            # Limit the number of lines to compare if needed
                            head: int = params['head']
                            file_to_compare: str = None
                            # limit the number of lines
                            if head > 0:
                                # use the head command to limit the number of lines
                                sub_file = 'sub_input_file.faa'
                                result  = shell_proxy.run([f"head -n {head} {input_file_unzipped} > {sub_file}"])
                                if result != 0:
                                    raise Exception('Error during the head command')
                                file_to_compare = sub_file
                            else:
                                # no need to limit the number of line
                                file_to_compare = input_file_unzipped
                            ############################ Run the blast and retrieve reulst ############################
                            output_file_name = 'output.txt'
                            # run the blast
                            result = shell_proxy.run([f"blastp -query {file_to_compare} -db {zebra_db} -out {output_file_name}"])
                            if result != 0:
                                raise Exception('Error during the blast')
                            # get the absolute path of the output
                            output_file_path = os.path.join(
                                shell_proxy.working_dir, output_file_name)
                            # create the output Resource (File)
                            output_file = File(output_file_path)
                            # return the output table
                            return {'blast_result': output_file}
                        def run_after_task(self) -> None:
                            # use to delete the temp folder once the task is done and output resources saved
                            # this is safe to do it here becase the output resource was move to the Resource location
                            if self.shell_proxy:
                                self.shell_proxy.clean_working_dir()
                    

                    Execute the task


                    To execute the task, you will need to upload the input data in our lab and configure it.


                    Here is a link of and example for the input data : https://storage.gra.cloud.ovh.net/v1/AUTH_a0286631d7b24afba3f3cdebed2992aa/opendata/gws_academy/mouse.1.protein.faa.gz


                    In the config set the db path to : https://storage.gra.cloud.ovh.net/v1/AUTH_a0286631d7b24afba3f3cdebed2992aa/opendata/gws_academy/zebrafish.1.protein.faa.gz


                    And set the head to 11 as in the Blast tutorial to make the execution faster.