Multi-Step Self-Service Pipeline Protocol Example

The example in this topic shows you a self-service pipeline protocol that consists of multiple steps. In this example, the pipeline protocol consists of three steps. The second and third steps in the example are dummy steps that have been created to show you the structure and how slugs should match one another. Note that there is no actual code that performs tasks on the data files.

File Structure

The following is the file structure used in this example.

multi_step_demo/
├── code/
│   ├── multi_step_protocol/
│   │   ├── protocol.json
│   │   └── script.js
│   ├── step2_enrichment/
│   │   ├── config.json
│   │   ├── main.py
│   │   ├── Pipfile
│   │   ├── requirements.txt
│   ├── step3_other_logic/
│   │   ├── config.json
│   │   ├── main.py
│   │   ├── Pipfile
│   │   ├── requirements.txt
│   └── commands.txt
└── Pipfile

Protocol Definition

The protocol is defined in the protocol.json and script.js files.

protocol.json

This file provides a description of the entire protocol and indicates which step(s) to run. It also defines configuration settings available to the user.

{
    "protocolSchema": "v2",
    "name": "Multi-step pipeline v2 demo",
    "description": "Demonstration of how to use multi-step pipelines",
    "steps": [
        {
            "slug": "first-step-fluoroskan-raw-to-ids",
            "description": "Convert Fluoroskan raw input to IDS JSON",
            "type": "generator",
            "script": {
                "namespace": "common",
                "slug": "thermofisher-fluoroskan-raw-to-ids",
                "version": "v1.0.0"
            },
            "functionSlug": "thermofisher-fluoroskan-raw-to-ids"
        },
        {
            "slug": "second-step-jdoe-enrichment-helper",
            "description": "Perform some enrichment on a raw IDS file",
            "type": "generator",
            "script": {
                "namespace": "private-tetrascience",
                "slug": "jdoes-enrichment-helper-task-identifier",
                "version": "v1.0.0"
            },
            "functionSlug": "jdoe-step2-enrichment-helper"
        },
        {
            "slug": "third-step-jdoe-extra-logic",
            "description": "Perform some more business logic",
            "type": "generator",
            "script": {
                "namespace": "private-tetrascience",
                "slug": "jdoes-business-logic",
                "version": "v1.0.0"
            },
            "functionSlug": "jdoe-step3-other-logic"
        }
    ],
    "config": [
        {
            "slug": "some-config-param",
            "name": "Some random user setting",
            "description": "Some necessary config parameter",
            "type": "string",
            "required": true,
            "step": "second-step-jdoe-enrichment-helper"
        },
        {
            "slug": "business-critical-value",
            "name": "Super important setting",
            "description": "A business critical value we want to keep hidden in the UI",
            "type": "secret",
            "required": true,
            "step": "third-step-jdoe-extra-logic"
        }
    ]
}

Note that there are three steps indicated in the protocol.json file.

  1. first-step-fluoroskan-raw-to-ids converts Fluoroskan raw input to IDS JSON. This step leverages an existing pipeline.
  2. second-step-jdoe-enrichment-helper is used to enrich the raw IDS file.
  3. third-step-jdoe-extra-logic performs more business logic.

📘

NOTE

The second and third steps in this topic are dummy steps that have been created to show you how to structure files so that you can have more than one step in your protocol. There is no additional code that performs tasks.

script.js

This file indicates the order in which steps are run, as well as the instrument input files -- if any -- and any step configuration details.

async workflow => {
    // this is the raw Fluoroskan output file from the instrument
    const rawFluoroData = workflow.getContext('inputFile');

    const pipelineConfig = workflow.getContext('pipelineConfig');

    // here the value for runTask is the steps.slug
    // defined in the master script's protocol.json

    // for most raw to IDS pipelines (which are on protocolSchema v1),
    // you will need to pass the input directly in as the second argument
    const fluoroskanIDS = await workflow.runTask(
        'first-step-fluoroskan-raw-to-ids',
        rawFluoroData
    );

    // when you have multiple inputs into a task script,
    // you can pass in a dictionary. The keys can be whatever you want,
    // but the Python task script will have to use the same keys
    const enrichedIDS = await workflow.runTask(
        'second-step-jdoe-enrichment-helper',
        {
            fluoro_input_file: fluoroskanIDS,
            any_key_can_be_used: pipelineConfig
        }
    );

    // if no key name is provided, then the default value is used
    const finalResult = await workflow.runTask(
        'third-step-jdoe-extra-logic',
        {
            enriched_input_file: enrichedIDS,
            pipelineConfig
        }
    );

    return finalResult;
};

step 1 file(s)

The first step of the protocol has already been prepared by TetraScience. You only need to provide the function name for the script, which in this case is functionSlug: thermofisher-fluoroskan-raw-to-ids.

You can find the function name by looking at the protocol.json script for the pipeline protocol that you want to use.

No further files are needed for this step.

step 2 file(s)

config.json (step 2)

This file contains configuration information. This file is only required if the step can be configured by the user.

{
    "language": "python",
    "functions": [
        {
            "slug": "jdoe-step2-enrichment-helper",
            "function": "main.jdoe_enrichment_helper"
        }
    ]
}

main.py (step 2)

This file contains the actual python code which processes the data for the step. There can be more than one python file associated with a step.

import requests
import json

# the parameters for the task script main entry point
# are always "input" and "context" by convention
def jdoe_enrichment_helper(input: dict, context: object):

    # grab all pipeline config settings as a dictionary
    pipeline_config = input["any_key_can_be_used"]
    print(f"second step 'pipeline_config' contents: {pipeline_config}")

    # any variable name can be used here; value is a dictionary
    # note the value inside has to match the value from script.js
    the_input_file = context.read_file(input["fluoro_input_file"])
    print(f"second step 'the_input_file' contents: {the_input_file}")

    # to grab the actual data passed in, use the "body" key
    # for standard files which fit into memory (e.g. most IDS JSONs)
    # data is stored as bytes, and therefore must be decoded to a string
    data = json.loads(
        the_input_file["body"].decode("utf-8")
    )

    # grab a value from the IDS we passed in
    scaling_factor = data["methods"][0]["scaling_factor"]

    # create an output that we want to save to the datalake
    # note that the key used for "pipeline_config" is
    # the config.slug from the master script's protocol.json
    our_config_value = pipeline_config["some-config-param"]
    result = {
        "scaling_factor_from_ids": scaling_factor,
        "config_value": our_config_value
    }

    # write a file to the datalake
    return context.write_file(
        content=json.dumps(result, indent=2),
        file_name="jdoe_demo_step2_out.json",
        file_category="PROCESSED"
    )

step 3 file(s)

config.json (step 3)

This file contains configuration information. This file is only required if the step can be configured by the user.

{
    "language": "python",
    "functions": [
        {
            "slug": "jdoe-step3-other-logic",
            "function": "main.other_logic_helper_func"
        }
    ]
}

main.py (step 3)

This file contains the actual python code which processes the data for the step. There can be more than one python file associated with a step.

import pandas as pd
import json

def other_logic_helper_func(input: dict, context: object):

    # grab all pipeline config settings as a dictionary
    pipeline_config = input["pipelineConfig"]
    print(f"third step 'pipeline_config' contents: {pipeline_config}")

    # any variable name can be used here; value is a dictionary
    the_input_file = context.read_file(input["enriched_input_file"])
    print(f"third step 'the_input_file' contents: {the_input_file}")

    # to grab the actual data passed in, use the "body" key
    # for standard files which fit into memory (e.g. most IDS JSONs)
    # data is stored as bytes, and therefore must be decoded to a string
    data = json.loads(
        the_input_file["body"].decode("utf-8")
    )

    important_data = data["scaling_factor_from_ids"]

    # create an output that we want to save to the datalake
    # note that the key used for "pipeline_config" is
    # the config.slug from the master script's protocol.json
    crucial_config = context.get_secret_config_value("business-critical-value")

    result = f"{important_data},{crucial_config}"

    # write a file to the datalake (doesn't have to be JSON)
    return context.write_file(
        content=result,
        file_name="jdoe_demo_step3_out.csv",
        file_category="PROCESSED"
    )