Harmonize Data by Using SSPs: Map RAW Files to IDS Files
You can create self-service Tetra Data pipelines (SSPs) to parse proprietary instrument output files (RAW files) into a vendor-neutral and scientifically relevant Intermediate Data Schema (IDS).
This topic provides an example setup for for mapping RAW files to IDS files by using an SSP.
Architecture
The following diagram shows an example SSP workflow for mapping RAW files to IDS files:
The diagram shows the following workflow:
- The “Hello, World!” SSP Example
sspdemo-taskscript
task script version is updated tov3.0.0
. Theconfig.json
file has three exposed functions:- print-hello-world (
main.print_hello_world
), which is theprint_hello_world
function found in themain.py
file. - decorate-input-file (
main.decorate_input_file
), which is thedecorate_input_file
function found in themain.py
file. - harmonize-input-file (
main.harmonize_input_file
), which is theharmonize_input_file
function found in themain.py
file.
- print-hello-world (
- A protocol named
harmonize (v1.0.0)
is created. Theprotocol.yml
file provides the protocol name, description, and outlines one step:harmonize-input-file-step
. This step points to thesspdemo-taskscript
task script and the exposed function,harmonize-input-file
. The input to this function is the input file that kicked off the pipeline workflow.
NOTE
For an example SSP folder structure, see SSP Folder Structure in the "Hello, World!" SSP Example.
Create and Deploy the Task Script
Task scripts are the building blocks of protocols, so you must build and deploy your task scripts before you can deploy a protocol that uses them.
Task scripts require the following:
- A
config.json
file that contains configuration information that exposes and makes your Python functions accessible so that protocols can use them. - A Python file that contains python functions (
main.py
in the following examples) that include the code that’s used in file processing. - A
requirements.txt
file that either specifies any required third-party Python modules, or that is left empty if no modules are needed.
To create and deploy a task script that transforms a raw data file into an IDS file, do the following.
NOTE
For more information about creating custom task scripts, see Task Script Files. For information about testing custom task scripts locally, see Create and Test Custom Task Scripts.
Create a config.json
File
config.json
FileCreate a config.json
file in your code editor by using the following code snippet:
{
"language": "python",
"runtime": "python3.11",
"functions": [
{
"slug": "print-hello-world",
"function": "main.print_hello_world"
},
{
"slug": "decorate-input-file",
"function": "main.decorate_input_file"
},
{
"slug": "harmonize-input-file",
"function": "main.harmonize_input_file"
}
]
}
NOTE
You can choose which Python version a task script uses by specifying the
"runtime"
parameter in the script'sconfig.json
file. Python versions 3.7, 3.8, 3.9, 3.10, and 3.11 are supported currently. If you don't include a"runtime"
parameter, the script uses Python v3.7 by default.
Update Your main.py
File
main.py
FileUsing the task script from Decorate Data by Using SSPs: Add Labels to Files, add the following elements to your task script:
from ts_sdk.task.__task_script_runner import Context
def print_hello_world(input: dict, context: Context):
print("Hello World!")
return "Hello World!"
def decorate_input_file(input: dict, context: Context) -> dict:
print("Start 'decorate_input_file' function...")
input_file_pointer = input["input_file_pointer"]
file_name = context.get_file_name(input_file_pointer)
labels_json = input["labels_json"]
added_labels = context.add_labels(
file=input_file_pointer,
labels=labels_json,
)
print("'decorate_input_file' completed")
return input_file_pointer
def harmonize_input_file(input: dict, context: Context) -> dict:
# Retrieve Input File Pointer from input arguments
input_file_pointer = input["input_file_pointer"]
# Open the file and bring in contents to parse
# Extract fields and data relevant to the IDS
Order of fields analysisResult from biochemistry-analyzer IDS Details
# Create IDS json
# Save the file to S3 and save pointer to return
saved_ids = context.write_file(
content=json.dumps(ids_dict),
file_name="ids_demo.json",
file_category="IDS",
ids="NAMESPACE/IDS_NAME:VERSION"
)
return saved_ids
NOTE
The main purpose of the
harmonize_input_file
function is to do the following:
- Retrieve Input File Pointer from input arguments
- Open the file and bring in contents to parse
- Extract fields and data relevant to the IDS
- Create IDS JSON
- Save the file to Amazon Simple Storage Service (Amazon S3) and save the pointer to return
In this way you are creating a relevant parser for your raw instrument data by using Python to convert the vendor-specific format into a vendor-neutral format.
Identify Instrument Data Structure and Mapping
To write the custom parser needed for this example setup, you need to identify the structure of the RAW instrument data, and understand how to map that data to the IDS.
To identify the structure of the RAW instrument data that you want to map to an IDS, do the following:
- Look at your file on your local system or view the file on the TDP. If you’ve uploaded the file onto the TDP, you can then Perform a Basic File Search and View a File Summary.
RAW File Example from a Biochemistry Analyzer (YSI Bioanalyzer 2900)
NOTE
For this example setup, copy the following contents into an example file to use with your custom parser.
{
"analysisResult": "DAILY_CHECKS TestBatch-2 R24_A02 Probe1A Glucose 12/11/2020 10:50 AM 23.08 52.9 mmol/L 1 mmol/L 1",
"timestamp": 1607701875000
}
- Find the IDS that you want to map to on the TDP. For instructions, see View IDSs and Their Details. For this example setup, open the
biochemistry-analyzer
IDS.
IDS File Example (biochemistry-analyzer
)
If you select the ReadMe
file for the biochemistry-analyzer
IDS in the TDP, the instrument details have the following form:
<Experiment Name> <Batch Name> <Analyte Source Id> <Probe Id> <Chemistry> <Machine Date> <Machine Time> <Temperature in Celsius> <Concentration value> <Concentration units> <Analysis Error Code>
If you look at the Raw to IDS Mapping section, the fields are mapped in the following way:
- On the IDS Details page, in the ACTIONS section, select View expected.json. This provides an example of what the resulting JSON should look like once processing a file with your custom parser is complete.
Expected IDS JSON Example from a Biochemistry Analyzer (YSI Bioanalyzer 2900)
{
"@idsType": "biochemistry-analyzer",
"@idsNamespace": "common",
"@idsVersion": "v1.0.0",
"time": {
"export": "2020-12-11T15:48:45.000Z"
},
"sample": {
"batch": "TestBatch-8"
},
"results": [{
"time": {
"measurement": "2020-12-11T10:47:00"
},
"probe": "Probe3A",
"temperature": {
"value": 23.11,
"unit": "DegreeCelsius"
},
"chemical_concentration": {
"name": "glucose",
"value": 15.5,
"unit": "MillimolePerLiter"
}
}, {
"time": {
"measurement": "2020-12-11T10:47:00"
},
"probe": "Probe3B",
"temperature": {
"value": 23.11,
"unit": "DegreeCelsius"
},
"chemical_concentration": {
"name": "lactate",
"value": 5.75,
"unit": "MillimolePerLiter"
}
}]
}
Update Your main.py
File with a Custom Parser
main.py
File with a Custom ParserUpdate your main.py
file in your code editor by using the following code snippet:
NOTE
Make sure that you Include the functions included in the
main.py
file that you created for decorating data by using an SSP.
# Add new import statement
import datetime
# Include the print_hello_world and decorate_input_file functions here
# Add this new function
def harmonize_input_file(input: dict, context: Context) -> dict:
print("Start 'harmonize_input_file' function...")
input_file_pointer = input["input_file_pointer"]
# Open the file and import json
f = context.read_file(input_file_pointer, form='file_obj')
raw_data = f['file_obj'].read().decode("utf-8")
raw_json = json.loads(raw_data)
# Information from raw file json
# ASSUMPTION: there is only one reading and one json entry
"""
{
"analysisResult": "DAILY_CHECKS TestBatch-2 R24_A02 Probe1A Glucose 12/11/2020 10:50 AM 23.08 52.9 mmol/L 1 mmol/L 1",
"timestamp": 1607701875000
}
"""
# Pull out the relevant fields from json
analysisResult = raw_json["analysisResult"]
timestamp = raw_json["timestamp"]
# Order of fields analysisResult from biochemistry-analyzer IDS Details
# ASSUMPTION: concentration units are always mmol/L
"""
<Experiment Name> <Batch Name> <Analyte Source Id> <Probe Id> <Chemistry> <Machine Date> <Machine Time> <Temperature in Celsius> <Concentration value> <Concentration units> <Analysis Error Code>
"""
analysisResult_items = analysisResult.split(" ")
experiment_name = analysisResult_items[0]
batch_name = analysisResult_items[1]
analyte_source_id = analysisResult_items[2]
probe_id = analysisResult_items[3]
chemistry = analysisResult_items[4]
machine_datetime = datetime.datetime.strptime(" ".join(analysisResult_items[5:8]), '%m/%d/%Y %H:%M %p').isoformat()
temp_in_c = analysisResult_items[8]
concentration_value = analysisResult_items[9]
analysis_error_code = analysisResult_items[13]
time_export = datetime.datetime.fromtimestamp(raw_json['timestamp']/1000).isoformat()
# Create IDS dictionary
"""
To get the structure, look at the expected.json file in the biochemistry-analyzer IDS Details
"""
ids_dict = {}
# Generic Info
ids_dict["@idsNamespace"] = "common"
ids_dict["@idsType"] = "biochemistry-analyzer"
ids_dict["@idsVersion"] = "v1.0.0"
# Export Time
ids_dict["time"] = {"export": time_export}
# Sample
ids_dict["sample"] = {"batch": batch_name}
# Results
ids_results_item = {}
ids_results_item["time"] = {"measurement": machine_datetime}
ids_results_item["probe"] = probe_id
ids_results_item["temperature"] = {"value": temp_in_c, "unit": "DegreesCelsius"}
ids_results_item["chemical_concentration"] = {"name": chemistry, "value": concentration_value, "unit": "MillimolePerLiter"}
ids_dict["results"] = [ids_results_item]
# Save the file to S3 and save pointer to return
saved_ids = context.write_file(
content=json.dumps(ids_dict),
file_name="ids_demo.json",
file_category="IDS",
ids="common/biochemistry-analyzer:v1.0.0"
)
print("'harmonize_input_file' completed")
return saved_ids
Context API
In the Python code provided in this example setup, the Context API is used by importing it in the main.py
file (from ts_sdk.task.__task_script_runner import Context
). The Context
section provides the necessary APIs for the task script to interact with the TDP.
This example setup uses the following Context API endpoint:
- context.write_file: Writes an output file to the Tetra Data Lake
Create a Python Package
Within the task script folder that contains the config.json
and main.py
files, use Python Poetry to create a Python package and the necessary files to deploy them to the TDP.
Poetry Command Example to Create a Python Package
poetry init
poetry add datetime
poetry export --without-hashes --format=requirements.txt > requirements.txt
NOTE
If no packages are added, this
poetry export
command example produces text inrequirements.txt
that you must delete to create an emptyrequirements.txt
file. Arequirements.txt
file is required to deploy the package to the TDP.
Deploy the Task Script
To the deploy the task script, run the following command from your command line (for example, bash):
ts-sdk put task-script private-{TDP ORG} sspdemo-taskscript v3.0.0 {task-script-folder} -c {auth-folder}/auth.json
NOTE
Make sure to replace
{TDP ORG}
with your organization slug,{task-script-folder}
with the local folder that contains your protocol code, and{auth-folder}
with the local folder that contains your authentication information.Also, when creating a new version of a task script and deploying it to the TDP, you must increase the version number. In this example command, the version is increased to
v3.0.0
.
Create and Deploy a Protocol
Protocols define the business logic of your pipeline by specifying the steps and the functions within task scripts that execute those steps. For more information about how to create a protocol, see Protocol YAML Files.
In the following example, there’s one step: harmonize-input-file-step
. This step uses the harmonize-input-file
function that’s in the sspdemo-taskscript
task script, which is v3.0.0
of the task script.
Create a protocol.yml
File
protocol.yml
FileCreate a protocol.yml
file in your code editor by using the following code snippet:
protocolSchema: "v3"
name: "Harmonize - v3 protocol"
description: "Protocol that harmonizes biochemistry analyzer file (RAW to IDS)."
steps:
- id: harmonize-input-file-step
task:
namespace: private-training-sspdemo
slug: sspdemo-taskscript
version: v3.0.0
function: harmonize-input-file
input:
input_file_pointer: $( workflow.inputFile )
NOTE
When using a new task script version, you must use the new version number when we’re calling that task script in the protocol step. This example
protocol.yml
file refers tov3.0.0
.
Deploy the Protocol
To the deploy the protocol, run the following command from your command line (for example, bash):
ts-sdk put protocol private-{TDP ORG} harmonize v1.0.0 {protocol-folder} -c {auth-folder}/auth.json
NOTE
Make sure to replace
{TDP ORG}
with your organization slug,{protocol-folder}
with the local folder that contains your protocol code, and{auth-folder}
with the local folder that contains your authentication information.
NOTE
To redeploy the same version of your code, you must include the
-f
flag in your deployment command. This flag forces the code to overwrite the file. The following are example protocol deployment command examples:
ts-sdk put protocol private-xyz hello-world v1.0.0 ./protocol -f -c auth.json
ts-sdk put task-script private-xyz hello-world v1.0.0 ./task-script -f -c auth.json
For more details about the available arguments, run the following command:
ts-sdk put --help
Create a Pipeline That Uses the Deployed Protocol
To use your new protocol on the TDP, create a new pipeline that uses the protocol that you deployed. Then, upload a file that matches the pipeline’s trigger conditions (for this example setup, use the biochemistry-analyzer
file contents provided).
Updated about 2 months ago