Create and Test Scripts

πŸ“˜

Prerequisites

These instructions explain how to create self-service pipeline scripts and test them. It also provides details about the sample protocol and task scripts that are part of the TS SDK and information about artifacts.

Create and Test Scripts

To create and test the scripts:

  1. Edit the following files in the newly created folder as needed - these files will be your scripts and tests:
task-script
β”œβ”€β”€ __test__
β”‚Β Β  β”œβ”€β”€ data
β”‚Β Β  β”‚Β Β  β”œβ”€β”€ expected.json
β”‚Β Β  β”‚Β Β  └── input.txt
β”‚Β Β  β”œβ”€β”€ test_business_logic.py
β”‚Β Β  └── test_integration.py
└── main.py

As you edit, reference the Self-Service Pipelines sections Script Walkthroughs below.

  1. In your terminal, run the following to test your code:
$ pipenv run python -m pytest

# or

$ pipenv shell
$ python -m pytest

Script Walkthroughs

This section provides a walkthrough of the scripts provided with the TS SDK.

To run a data processing script on Tetra Data Platform (TDP), you need to create a protocol and at least one task script. A protocol can have one or more steps. Each step is a piece of logic you want to perform. And each step invokes a function defined in a task script. Task scripts are written in Python.

πŸ“˜

NOTE:

Sample Task Script

When you initialized the TS SDK, you created several folders including the task script folder.

A task script folder contains the following files:

  • config.json - required, describes all the entry-point functions invokable from the protocol.
  • Python scripts - required, contains your business logic. Conventionally, the entry-point file is usually named main.py).
  • README.md - provides documentation on the scripts.
  • requirements.txt - specifies third-party Python modules needed.

Here is an illustrative example:

{
  "language": "python",
  "functions": [
    {
      "slug": "process_file",         // (1)
      "function": "main.process_file" // (2)
    }
  ]
}
def process_file(input: dict, context: object):
    """
    Logic:
    1. Get input file length
    2. Get offset from pipeline config
    3. Write a text file to Data Lake

    Args:
        input (dict): input dict passed from master script
        context (object): context object

    Returns:
        None
    """
    print("Starting task")
    
    input_data = context.read_file(input["inputFile"]) # 1
    length = len(input_data["body"])
    offset = int(input["offset"])                      # 2
    context.write_file(                                # 3
        content=f"length + offset is {length + offset}",
        file_name="len.txt",
        file_category="PROCESSED"
    )
    
    print("Task completed")
Basic demo task script
======================

config.json

This file exposes all the functions you want to invoke from protocols.

For each object in functions array, slug (1) is a name you defined that will be used to invoke the function from the protocol. It must be unique within this task script. Conventionally, the slug is the same as the name of the Python function.

function (2) is a reference to the Python function, including the module where it’s defined separated by a dot .. In this case, it refers to process_file function in main.py module.

main.py

In this example, process_file is the entrypoint to the main business logic. We can see there are two arguments passed in - input and context.

input is defined in the protocol and input["inputFile"] is a reference to a file in the data lake (more in Protocol section).

context provides necessary APIs for the task script to interact with the TDP. You can see in this example that:

  • It first reads the file using the context.read_file function (1).
  • Then, it gets the offset from the input object (2). Note: offset is passed in from the protocol (more in Protocol section).
  • Lastly, it writes a new file to Data Lake using context.write_file function (3). You can find all context API definitions in Context API doc.

Now that we have created a straightforward task script, we can create a protocol.

requirements.txt

You will most likely use some third-party Python modules in your Python scripts. You will need to create a requirements.txt file and put it in the root of your task script folder (at the same level as config.json). Then during the build process, the builder will install those packages as well.

To generate requirements.txt, if you are using pipenv, you can run:

pipenv lock -r > requirements.txt

if you are using pip, you can run:

pip freeze > requirements.txt

If you are not familiar with how the requirements.txt file works, there are many resources online, including https://packaging.python.org.

Sample Protocol Script

When you initialized the TS SDK, you created several folders and file, including the folder that holds the protocol files.

A protocol folder requires two files:

  • script.js - required, Describes the order in which to invoke the different steps in the protocol and pass information across the steps
  • protocol.json - required, Describes the pipeline steps and pipeline configuration parameters.

Here’s an example protocol:

{
  "protocolSchema": "v2",    // protocol schema version, please use "v2"
  "name": "offset length",   // protocol name displayed on UI
  "description": "A demo",   // protocol description displayed on UI
  "steps": [
    {
      "slug": "process_file",
      "description": "Get file content length, add offset, write ",
      "type": "generator",
      "script": {
        "namespace": "private-<your_org_slug>",
        "slug": "offset-length",
        "version": "v1.0.0"
      },
      "functionSlug": "process_file",
    }
  ],
  "config": [
    {
      "slug": "offset",       // variable name referred in task script
      "name": "offset",       // human-readable name displayed on UI
      "type": "number",       // enum ["string", "boolean", "number", "object", "secret"], more on "object" and "secret in Pipeline Configuration section"
      "step": "process_file", // display which step this config belongs to on UI. Has to match one "slug" in "steps"
      "required": true,       // If true, this config param is mandatory on UI
    }
  ]
}
/**
 * @param {Object} workflow
 * @param {function(path: string)} workflow.getContext Returns context prop by path
 * @param {function(slug: string, input: Object, overwrite: Object = {}): Promise<*>} workflow.runTask Runs a task
 * @param {{info: Function, warn: Function, error: Function}} utils.logger
 * @returns {Promise<*>}
 */
async (workflow, { logger }) => {
  // custom logic starts
  logger.info('here we start the workflow...');
  
  const { offset } = workflow.getContext('pipelineConfig');
  await workflow.runTask(
    "process_file",                                   // (1)   
    {                       
      inputFile: workflow.getContext("inputFile"),    // (2)
      offset,                                         // (3)
      // other things you want to pass to task script
    }
  );
};
protocol.json

The example above illustrates the most common usage pattern - a protocol that runs a single step - process_file.

The meaning of protocolSchema, name, description is commented in the code.

Let's look at steps. It's an array of objects and each object is a step in the protocol. The meaning of the key fields are:
slug - Name you define for this step.
type - Please use "generator".
script - Contains information about the task script. Using this information, the protocol is able to refer to the task script used.

  • namespace - Each organization on TDP will have its own namespace, and it will have a prefix of "private-". If your org slug is "tetra", your namespace will be "private-tetra".
  • slug - Name of your task script folder.
  • version - Used for version control, it is your task script version.
    functionSlug - Function you want to invoke from the task script as defined in config.json in task script folder.

Please refer to How to Use Namespaces orgSlugs and Versions to Specify User Permissions/Access to learn more about how Namespaces, orgSlugs, and Versions together specify who has access to the files you create.

script.js

The script does 2 things: gets the pipeline configuration and runs the task.

The first argument to the function is workflow. The workflow object passed to the protocol is analogous to the context object passed to the task script - it lets you interact with the platform. It supports the following APIs:

  • workflow.getContext("inputFile") - Returns the input file that triggered the pipeline.
  • workflow.getContext("pipelineConfig") - Returns pipeline config object.
  • workflow.runTask(stepSlug, input) - Runs a specific task script. It uses the step slug from protocol.json. input is a dict that contains anything you want to pass into the function defined in the step.

Note: You can also get pipeline config from the context object in the task script. However, we recommend passing the pipeline config from the protocol to make the task script less dependent on context. It's also easier to write tests for your task script.

logger can be used for logging messages you want to display on the UI when a workflow is run. The logger supports 4 levels: "debug", "info", "warn", "error".

How to Use Namespaces, orgSlugs, and Version to Determine Permissions/Access

When you reference or deploy an artifact, such as a task script file, you’ll need to indicate a namespace, orgSlug, and version. The combination of the namespace, orgSlug, and version indicates where the artifact is stored in S3 and who has permission to access it. This same combination of a namespace, orgSlug, and version is also used to identify and organize other artifacts such as IDS schemas or other files.

Namespaces and permissions are discussed in detail this topic.

πŸ“˜

NOTE:

If you are a customer who is building and deploying a Self-Service Pipeline, only use the private namespace.

πŸ“˜

NOTE:

Only admins of the organization can deploy artifacts.


What’s Next