Transform Tetra Data in the Lakehouse
To transform Tetra Data in the Data Lakehouse, create a Tetraflow Pipeline.
NOTE
The Data Lakehouse Architecture is available to all customers as part of an early adopter program (EAP) and will continue to be updated in future TDP releases. If you are interested in participating in the early adopter program, please contact your customer success manager (CSM).
Tetraflow Pipelines
Tetraflow Pipelines allow you to define and schedule data transformations in a familiar SQL language and generate custom, use case-specific Lakehouse tables that are optimized for downstream analytics applications. Tetraflow Pipelines also provide the option to schedule when they run, so you don't have to manage multiple pipelines with different file-specific trigger conditions.
Tetraflows define the business logic of your pipeline by specifying the the following SQL Workflow
steps in a tetraflow.yml
file:
source
: Defines the path to your data sourceprocessor
: Defines the path to your transformation logic in SQLsink
: Defines the target Delta Table(s)
For instructions on how to create and deploy a custom Tetraflow pipelines, see Create a Tetraflow Pipeline.
Architecture
The following diagram shows an example Tetraflow pipeline workflow:
The diagram shows the following workflow:
-
The Tetraflow pipeline is configured with a Scheduled Trigger and is set to run at a specific time.
-
The Tetraflow pipeline runs a predefined SQL workflow that defines the path to the data source (
source
), transformation logic (processor
), and the target Lakehouse table(s) (sink
). -
When processing is finished, output files are then indexed according to the predefined Lakehouse tables and stored in the Data Lakehouse. Through this process, the data becomes easily accessible through an AI/ML-ready data storage format that operates seamlessly across all major data and cloud platform vendors. The data also remains available through search in the TDP user interface, TetraScience API, and SQL queries.
Create a Tetraflow Pipeline
To create a Tetraflow Pipeline, first create a Tetraflow that defines your transformation logic in SQL. Then, create a new Pipeline that uses the deployed Tetraflow.
Prerequisites
- Python version 3.7 or higher
pip
Python package manager- The TetraScience Command Line Interface (ts-cli). To install the ts-cli, run the following command in your local terminal:
pip3 install tetrascience-cli
- Knowledge of Spark Structured Query Language (SQL) Syntax
Step 1: Create a Tetraflow Artifact
Create a Tetraflow artifact by doing the following:
-
In your local terminal, create a directory for your Tetraflow.
-
Navigate to the directory and run the following command:
ts-cli init tetraflow
The command creates a
tetraflow.yml
file in the directory. -
Edit the
tetraflow.yml
file based on your use case. Make sure that you define the following properties:Name
: Defines the name of the Tetraflow that appears in the Tetra Data Platform (TDP)config
: Defines the Tetraflow Pipeline's runtime settingsinput
(category: source
): Defines the path to your data sourcetransform
(category: processor
): Defines your transformation logic in SQLoutput
(category: sink
): Defines the target Delta Table(s)
tetraflow.yml
File Example
tetraflow.yml
File ExampletetraflowSchema: v1
name: My Tetraflow
options:
batchMode: true
framework: spark
config:
input:
label: "Input Table Name"
description: "Name of the table to read from"
type: string
required: true
output:
label: "Output Table Name"
description: "Name of the table to write to"
type: string
required: true
workflow:
input:
type: Delta
category: source
description: Read the input dataset
properties:
loadPath: $( table(config.input) )
transform:
type: Sql
category: processor
description: Transform the dataset
needs: input
properties:
sql: |
SELECT field_1, field_2 as config_input
FROM input
output:
type: Delta
category: sink
description: Write the transformed data out
needs: transform
properties:
savePath: $( table(config.output) )
mode: overwrite
config
Parameters
config
ParametersIn the config
property, enter the following information:
- In the
input:
section, forlabel:
Enter the source Intermediate Data Schema (IDS) table's name. - In the
output:
section, forlabel:
Enter a name for the target Lakehouse table (Delta table) that you want to create.
config
Property Example
config
Property Exampleconfig:
input:
label: "Input Table Name"
description: "Name of the table to read from"
type: string
required: true
output:
label: "Output Table Name"
description: "Name of the table to write to"
type: string
required: true
source
(input) Parameters
source
(input) ParametersIn the Workflow
section, for the input
(category: source
) property, enter the following information:
- Replace
input
with the name of the data source. - For
loadPath:
, enter the source IDS table name in the following format:$( table('table_name_v') )
(For example,loadPath: $( table('lcuv_empower_v16') )
)
NOTE
You can source data from multiple tables in the scope of a single workflow by adding more
input
code blocks.
source
(input) Property Example
source
(input) Property Exampleinput:
type: Delta
category: source
description: Read the input dataset
properties:
loadPath: $( table('table_name_v') )
processor
(transform) Properties
processor
(transform) PropertiesNOTE
You can process data with multiple SQL statements in the scope of a single workflow by adding more
processor
code blocks.
In the Workflow
section, for the transform
(category: processor
) property, enter the following information:
- Replace
transform
with a short name of the SQL statement. - For
description
, enter a short description of the SQL statement. - For
needs
, enterinput
. - For
sql
, enter transformation logic for your pipeline by creating one or more SQL statements in Spark SQL syntax. Make sure that the statements can operate on any of the tables that you defined in theinput
property.
IMPORTANT
For the initial Data Lakehouse Architecture (EAP), it's recommended that you apply data deduplication best practices to your transformation logic. Applying these patterns ensures that you're using the latest records in downstream datasets and retrieving the most current data in SQL query results when either running SQL queries on the new tables or setting up a Tetraflow pipeline.
processor
(transform) Property Example
processor
(transform) Property ExampleThe following transform
property example defines a processor that will extract the peaks
data from Empower, Chromeleon, and Unicorn IDS tables:
peaks:
type: Sql
category: processor
description: Harmonization peaks tables (silver)
needs: input
properties:
sql: |
WITH empower_peaks AS (
SELECT
file_id AS injection_uuid,
file_id AS result_uuid,
file_id AS peak_uuid,
file_id AS peak_id,
rp2.amount.value AS amount_value,
rp2.analyte AS analyte,
rp2.area.value AS area_value,
rp2.area.percent.value AS area_percent,
rpw2.value AS peak_width,
rp2.resolution.value AS resolution,
rp2.retention.time.value AS retention_time,
rp2.signal_to_noise.value AS signal_to_noise_ratio,
rp2.symmetry_factor.value AS symmetry_factor,
rp2.usp_tailing_factor.value AS usp_tailing
FROM (
SELECT file_id, rp , rpw
FROM empower
LATERAL VIEW EXPLODE(results.peaks) AS rp
LATERAL VIEW EXPLODE(rp.widths) AS rpw
)
LATERAL VIEW EXPLODE(rp) AS rp2
LATERAL VIEW EXPLODE(rpw) AS rpw2
),
chromeleon_peaks AS (
SELECT
file_id AS injection_uuid,
file_id AS result_uuid,
file_id AS peak_uuid,
file_id AS peak_id,
rp.quantification.amount.value AS amount_value,
rp.identification.name AS analyte,
rp.quantification.area.value AS area_value,
rp.quantification.area_percent.value AS area_percent,
rp.resolution.peak_width.base.value AS peak_width,
rp.resolution.calculated.value AS resolution,
rp.retention.time.value AS retention_time,
rp.statistics.signal_to_noise.value AS signal_to_noise_ratio,
rp.statistics.symmetry.value AS symmetry_factor,
rp.baseline.capacity_factor.value AS usp_tailing
FROM chromeleon
LATERAL VIEW EXPLODE(results) AS r
LATERAL VIEW EXPLODE(r.peaks) AS rp
),
unicorn_peaks AS (
SELECT
file_id AS injection_uuid,
file_id AS result_uuid,
file_id AS peak_uuid,
file_id AS peak_id,
'' AS amount_value,
rp.name AS analyte,
rp.area.value AS area_value,
rp.area_percent.value AS area_percent,
rp.width.value AS peak_width,
rp.resolution.value AS resolution,
'' AS retention_time,
'' AS signal_to_noise_ratio,
'' AS symmetry_factor,
'' AS usp_tailing
FROM akta
LATERAL VIEW EXPLODE(result.peaks) AS rp
)
SELECT * FROM empower_peaks
UNION ALL
SELECT * FROM chromeleon_peaks
UNION ALL
SELECT * FROM unicorn_peaks
sink
(output) Property
sink
(output) PropertyIn the Workflow
section, for the output
(category: sink
) property, enter the following information:
- For
needs
, enter the name of the required SQL statement input that you created in theprocessor
property. - For
savePath
, enter the target Lakehouse table name that you want to create in the following format:$( table('table_name_v') )
(For example,savePath: $( table('peaks') )
)
NOTE
You can transform source data into multiple Lakehouse tables in the scope of a single workflow by adding more
sink
code blocks.
Step 2: Deploy the Tetraflow Artifact
Deploy the Tetraflow artifact by doing the following:
- In your local terminal, navigate to the directory that contains the Tetraflow.
- Run the following command:
ts-cli publish --config ./path_to_config
Step 3: Create a Pipeline by Using the Deployed Tetraflow Artifact
To use your new Tetraflow in the TDP, create a new pipeline that uses the Tetraflow that you deployed.
When creating the pipeline, make sure that you do the following:
- When you select a protocol, choose Tetraflows from the upper right drop-down menu (prepopulated with the All value).
- Select the Tetraflow that you created from the list.
- When you select a trigger, enter a specific, recurring time, for the Tetraflow Pipeline to run on. The minimum time interval is a
minute
. The maximum time interval is ayear
.
- Configure the CLUSTER SIZE setting to define the size of the cluster that you want the TDP to spin up to run your Tetraflow Pipeline. For most use cases, the default Tiny cluster size is most appropriate.
IMPORTANT
Before selecting any cluster size other than the default Tiny setting, please contact your customer success manager (CSM) or account executive. They will help you determine if a larger cluster size is required for your use case.
For more information, see Set Up and Edit Pipelines.
Monitor Tetraflow Pipeline Processing
To monitor Tetraflow Pipeline Processing, files, workflows, and logs, do the following:
-
In the left navigation menu, choose Pipelines. Then, choose Workflow Processing. The Workflow Processing page appears.
-
To find a specific Tetraflow Pipeline, type the pipeline's name in the upper left search box. To filter your search by workflow date, enter a date range in the Start Date - End Date field. To filter by workflow status, select a status from the Select Status drop-down menu.
NOTE
The number of active workflows and historical workflows display near the top of the page. The active workflows count shows all workflows that are pending or currently being processed. The historical workflows count represents workflows that have been completed or have failed. If workflows are active, you can select the refresh button next to it to view the latest status.
-
Select a workflow. Two tabs appear in the right panel:
- The Pipeline tab provides a link that will take you to a page where you can edit your pipeline, scan for unprocessed files, create a bulk pipeline process job, cancel pending workflows, and view high-level triggers, and Tetraflow details.
- The Workflow tab provides links to open workflows and view workflow logs.
Run a Tetraflow Pipeline Manually
To run a Tetraflow Pipeline manually, do the following:
- Open the Workflow Processing page.
- Select the Tetraflow Pipeline that you want to run.
- Select the right Pipeline tab.
- Choose Run Now. The Tetraflow Pipeline that you selected runs.
Edit a Tetraflow Pipeline
To edit a Tetraflow Pipeline, do the following:
- Open the Workflow Processing page.
- Select the Tetraflow Pipeline that you want to edit.
- Select the right Pipeline tab.
- Choose Edit Pipeline. The Pipeline Manager page appears.
For more information about how to edit pipelines, see Set Up and Edit Pipelines.
Updated about 2 months ago