Create and Test Scripts
Prerequisites
- These instructions are part of the self-service pipeline documentation. Before you begin, read the Self-Service Pipeline Overview.
- Before you create and test scripts, you should set up your environment and initialize the TS SDK.
These instructions explain how to create self-service pipeline scripts and test them. It also provides details about the sample protocol and task scripts that are part of the TS SDK and information about artifacts.
Create and Test Scripts
To create and test the scripts:
- Edit the following files in the newly created folder as needed - these files will be your scripts and tests:
task-script
├── __test__
│  ├── data
│  │  ├── expected.json
│  │  └── input.txt
│  ├── test_business_logic.py
│  └── test_integration.py
└── main.py
As you edit, reference the Self-Service Pipelines sections Script Walkthroughs below.
- In your terminal, run the following to test your code:
$ pipenv run python -m pytest
# or
$ pipenv shell
$ python -m pytest
Code Walkthrough
This section provides a walkthrough of the scripts provided with the TS SDK.
To run a data processing script on Tetra Data Platform (TDP), you need to create a protocol and at least one task script. A protocol can have one or more steps. Each step is a piece of logic you want to perform. And each step invokes a function defined in a task script. Task scripts are written in Python.
NOTE:
An example of a multi-step pipeline protocol is available in this topic.
To better understand how slugs are used with Self-Service Pipeline, read the Slugs and Self-Service Pipelines topic.
Sample Task Script
When you initialized the TS SDK, you created several folders including the task script folder.
A task script folder contains the following files:
config.json
- required, describes all the entry-point functions invokable from the protocol.- Python scripts - required, contains your business logic. Conventionally, the entry-point file is usually named
main.py
). README.md
- provides documentation on the scripts.requirements.txt
- specifies third-party Python modules needed.
Here is an illustrative example:
{
"language": "python",
"functions": [
{
"slug": "process_file", // (1)
"function": "main.process_file" // (2)
}
]
}
def process_file(input: dict, context: object):
"""
Logic:
1. Get input file length
2. Get offset from pipeline config
3. Write a text file to Data Lake
Args:
input (dict): input dict passed from master script
context (object): context object
Returns:
None
"""
print("Starting task")
input_data = context.read_file(input["inputFile"]) # 1
length = len(input_data["body"])
offset = int(input["offset"]) # 2
context.write_file( # 3
content=f"length + offset is {length + offset}",
file_name="len.txt",
file_category="PROCESSED"
)
print("Task completed")
Basic demo task script
======================
config.json
This file exposes all the functions you want to invoke from protocols.
For each object in functions
array, slug
(1) is a name you defined that will be used to invoke the function from the protocol. It must be unique within this task script. Conventionally, the slug is the same as the name of the Python function.
function
(2) is a reference to the Python function, including the module where it’s defined separated by a dot .
. In this case, it refers to process_file
function in main.py
module.
NOTE
You can choose which Python version a task script uses by adding a
"runtime"
parameter to the script'sconfig.json
file. Python versions 3.7, 3.8, 3.9, 3.10, and 3.11 are supported currently.Example "runtime" parameter
{ "runtime": "python3.11" }
main.py
In this example, process_file
is the entrypoint to the main business logic. We can see there are two arguments passed in - input
and context
.
input
is defined in the protocol and input["inputFile"]
is a reference to a file in the data lake (more in Protocol section).
context
provides necessary APIs for the task script to interact with the TDP. You can see in this example that:
- It first reads the file using the
context.read_file
function (1). - Then, it gets the
offset
from the input object (2). Note:offset
is passed in from the protocol (more in Protocol section). - Lastly, it writes a new file to Data Lake using
context.write_file
function (3). You can find allcontext
API definitions in Context API doc.
Now that we have created a straightforward task script, we can create a protocol.
requirements.txt
You will most likely use some third-party Python modules in your Python scripts. You will need to create a requirements.txt
file and put it in the root of your task script folder (at the same level as config.json). Then during the build process, the builder will install those packages as well.
To generate requirements.txt
, if you are using pipenv, you can run:
pipenv lock -r > requirements.txt
if you are using pip, you can run:
pip freeze > requirements.txt
If you are not familiar with how the requirements.txt
file works, there are many resources online, including https://packaging.python.org.
Sample Protocol Script
When you initialized the TS SDK, you created several folders and file, including the folder that holds the protocol files.
A protocol folder requires two files:
script.js
- required, Describes the order in which to invoke the different steps in the protocol and pass information across the stepsprotocol.json
- required, Describes the pipeline steps and pipeline configuration parameters.
Here’s an example protocol:
{
"protocolSchema": "v2", // protocol schema version, please use "v2"
"name": "offset length", // protocol name displayed on UI
"description": "A demo", // protocol description displayed on UI
"steps": [
{
"slug": "process_file",
"description": "Get file content length, add offset, write ",
"type": "generator",
"script": {
"namespace": "private-<your_org_slug>",
"slug": "offset-length",
"version": "v1.0.0"
},
"functionSlug": "process_file",
}
],
"config": [
{
"slug": "offset", // variable name referred in task script
"name": "offset", // human-readable name displayed on UI
"type": "number", // enum ["string", "boolean", "number", "object", "secret"], more on "object" and "secret in Pipeline Configuration section"
"step": "process_file", // display which step this config belongs to on UI. Has to match one "slug" in "steps"
"required": true, // If true, this config param is mandatory on UI
}
]
}
/**
* @param {Object} workflow
* @param {function(path: string)} workflow.getContext Returns context prop by path
* @param {function(slug: string, input: Object, overwrite: Object = {}): Promise<*>} workflow.runTask Runs a task
* @param {{info: Function, warn: Function, error: Function}} utils.logger
* @returns {Promise<*>}
*/
async (workflow, { logger }) => {
// custom logic starts
logger.info('here we start the workflow...');
const { offset } = workflow.getContext('pipelineConfig');
await workflow.runTask(
"process_file", // (1)
{
inputFile: workflow.getContext("inputFile"), // (2)
offset, // (3)
// other things you want to pass to task script
}
);
};
protocol.json
The example above illustrates the most common usage pattern - a protocol that runs a single step - process_file
.
The meaning of protocolSchema
, name
, description
is commented in the code.
Let's look at steps
. It's an array of objects and each object is a step in the protocol. The meaning of the key fields are:
slug
- Name you define for this step.
type
- Please use "generator".
script
- Contains information about the task script. Using this information, the protocol is able to refer to the task script used.
namespace
- Each organization on TDP will have its own namespace, and it will have a prefix of "private-". If your org slug is "tetra", your namespace will be "private-tetra".slug
- Name of your task script folder.version
- Used for version control, it is your task script version.functionSlug
- Function you want to invoke from the task script as defined in config.json in task script folder.
Please refer to How to Use Namespaces orgSlugs and Versions to Specify User Permissions/Access to learn more about how Namespaces, orgSlugs, and Versions together specify who has access to the files you create.
script.js
The script does 2 things: gets the pipeline configuration and runs the task.
The first argument to the function is workflow
. The workflow
object passed to the protocol is analogous to the context
object passed to the task script - it lets you interact with the platform. It supports the following APIs:
workflow.getContext("inputFile")
- Returns the input file that triggered the pipeline.workflow.getContext("pipelineConfig")
- Returns pipeline config object.workflow.runTask(stepSlug, input)
- Runs a specific task script. It uses the step slug from protocol.json.input
is a dict that contains anything you want to pass into the function defined in the step.
Note: You can also get pipeline config from the context
object in the task script. However, we recommend passing the pipeline config from the protocol to make the task script less dependent on context
. It's also easier to write tests for your task script.
logger
can be used for logging messages you want to display on the UI when a workflow is run. The logger supports 4 levels: "debug", "info", "warn", "error".
Permissions/Access during Deployment
When you reference or deploy an artifact, such as a task script file, you’ll need to indicate a namespace, orgSlug, and version. The combination of the namespace, orgSlug, and version indicates where the artifact is stored in S3 and who has permission to access it. This same combination of a namespace, orgSlug, and version is also used to identify and organize other artifacts such as IDS schemas or other files.
Click namespaces and permissions for more details.
NOTE:
If you are a customer who is building and deploying a Self-Service Pipeline, _only _use the _private _namespace.
NOTE:
Only admins of the organization can deploy artifacts.
Updated about 1 year ago