Transform Tetra Data in the Lakehouse

To transform Tetra Data in the Data Lakehouse, create a Tetraflow Pipeline.

📘

NOTE

The Data Lakehouse Architecture is available to all customers as part of an early adopter program (EAP) and will continue to be updated in future TDP releases. If you are interested in participating in the early adopter program, please contact your customer success manager (CSM).

Tetraflow Pipelines

Tetraflow Pipelines allow you to define and schedule data transformations in a familiar SQL language and generate custom, use case-specific Lakehouse tables that are optimized for downstream analytics applications. Tetraflow Pipelines also provide the option to schedule when they run, so you don't have to manage multiple pipelines with different file-specific trigger conditions.

Tetraflows define the business logic of your pipeline by specifying the the following SQL Workflow steps in a tetraflow.yml file:

  • source: Defines the path to your data source
  • processor: Defines the path to your transformation logic in SQL
  • sink: Defines the target Delta Table(s)

For instructions on how to create and deploy a custom Tetraflow pipelines, see Create a Tetraflow Pipeline.

Architecture

The following diagram shows an example Tetraflow pipeline workflow:

Tetraflow Pipeline diagram

The diagram shows the following workflow:

  1. The Tetraflow pipeline is configured with a Scheduled Trigger and is set to run at a specific time.

  2. The Tetraflow pipeline runs a predefined SQL workflow that defines the path to the data source (source), transformation logic (processor), and the target Lakehouse table(s) (sink).

  3. When processing is finished, output files are then indexed according to the predefined Lakehuse tables and stored in the Data Lakehouse. Through this process, the data becomes easily accessible through an AI/ML-ready data storage format that operates seamlessly across all major data and cloud platform vendors. The data also remains available through search in the TDP user interface, TetraScience API, and SQL queries.

Create a Tetraflow Pipeline

To create a Tetraflow Pipeline, first create a Tetraflow that defines your transformation logic in SQL. Then, create a new Pipeline that uses the deployed Tetraflow.

Prerequisites

Step 1: Create a Tetraflow Artifact

Create a Tetraflow artifact by doing the following:

  1. In your local terminal, create a directory for your Tetraflow.

  2. Navigate to the directory and run the following command:

    ts-cli init tetraflow

    The command creates a tetraflow.yml file in the directory.

  3. Edit the tetraflow.yml file based on your use case. Make sure that you define the following properties:

    • Name: Defines the name of the Tetraflow that appears in the Tetra Data Platform (TDP)
    • config: Defines the Tetraflow Pipeline's runtime settings
    • input (category: source): Defines the path to your data source
    • transform (category: processor): Defines your transformation logic in SQL
    • output (category: sink): Defines the target Delta Table(s)

tetraflow.yml File Example

tetraflowSchema: v1

name: My Tetraflow

options:
  batchMode: true
  framework: spark

config:
  input:
    label: "Input Table Name"
    description: "Name of the table to read from"
    type: string
    required: true
  output:
    label: "Output Table Name"
    description: "Name of the table to write to"
    type: string
    required: true

workflow:
  input:
    type: Delta
    category: source
    description: Read the input dataset
    properties:
      loadPath: $( table(config.input) )

  transform:
    type: Sql
    category: processor
    description: Transform the dataset
    needs: input
    properties:
      sql: |
        SELECT field_1, field_2 as config_input
        FROM input

  output:
    type: Delta
    category: sink
    description: Write the transformed data out
    needs: transform
    properties:
      savePath: $( table(config.output) )
      mode: overwrite

config Parameters

In the config property, enter the following information:

  • In the input: section, for label: Enter the source Intermediate Data Schema (IDS) table's name.
  • In the output: section, for label: Enter a name for the target Lakehouse table (Delta table) that you want to create.

config Property Example

config:
  input:
    label: "Input Table Name"
    description: "Name of the table to read from"
    type: string
    required: true
  output:
    label: "Output Table Name"
    description: "Name of the table to write to"
    type: string
    required: true

source (input) Parameters

In the Workflow section, for the input (category: source) property, enter the following information:

  • Replace input with the name of the data source.
  • For loadPath:, enter the source IDS table name in the following format: $( table('table_name_v') ) (For example, loadPath: $( table('lcuv_empower_v16') ))

📘

NOTE

You can source data from multiple tables in the scope of a single workflow by adding more input code blocks.

source (input) Property Example

input:
    type: Delta
    category: source
    description: Read the input dataset
    properties:
      loadPath: $( table('table_name_v') )

processor (transform) Properties

📘

NOTE

You can process data with multiple SQL statements in the scope of a single workflow by adding more processor code blocks.

In the Workflow section, for the transform (category: processor) property, enter the following information:

  • Replace transform with a short name of the SQL statement.
  • For description, enter a short description of SQL statement.
  • For needs, enter input.
  • For sql, enter transformation logic for your pipeline by creating one or more SQL statements in Spark SQL syntax. Make sure that the statements can operate on any of the tables that you defined in the input property.

🚧

IMPORTANT

For the initial Data Lakehouse Architecture (EAP), it's recommended that you apply data deduplication best practices to your transformation logic. Applying these patterns ensures that you're using the latest records in downstream datasets and retrieving the most current data in SQL query results when either running SQL queries on the new tables or setting up a Tetraflow pipeline.

processor (transform) Property Example

The following transform property example defines a processor that will extract the peaks data from Empower, Chromelwon, and Unicorn IDS tables:

peaks:
    type: Sql
    category: processor
    description: Harmonization peaks tables (silver)
    needs: input
    properties:
      sql: |
        WITH empower_peaks AS (
        SELECT
            file_id AS injection_uuid,
            file_id AS result_uuid,
            file_id AS peak_uuid,
            file_id AS peak_id,
            rp2.amount.value AS amount_value,
            rp2.analyte AS analyte,
            rp2.area.value AS area_value,
            rp2.area.percent.value AS area_percent,
            rpw2.value AS peak_width,
            rp2.resolution.value AS resolution,
            rp2.retention.time.value AS retention_time,
            rp2.signal_to_noise.value AS signal_to_noise_ratio,
            rp2.symmetry_factor.value AS symmetry_factor,
            rp2.usp_tailing_factor.value AS usp_tailing
        FROM (
            SELECT file_id, rp , rpw
            FROM empower  
            LATERAL VIEW EXPLODE(results.peaks) AS rp  
            LATERAL VIEW EXPLODE(rp.widths) AS rpw
        )
        LATERAL VIEW EXPLODE(rp) AS rp2
        LATERAL VIEW EXPLODE(rpw) AS rpw2

        ),
        chromeleon_peaks AS (
        SELECT
        file_id AS injection_uuid,
        file_id AS result_uuid,
        file_id AS peak_uuid,
        file_id AS peak_id,
        rp.quantification.amount.value AS amount_value,
        rp.identification.name AS analyte,
        rp.quantification.area.value AS area_value,
        rp.quantification.area_percent.value AS area_percent,
        rp.resolution.peak_width.base.value AS peak_width,
        rp.resolution.calculated.value AS resolution,
        rp.retention.time.value AS retention_time,
        rp.statistics.signal_to_noise.value AS signal_to_noise_ratio,
        rp.statistics.symmetry.value AS symmetry_factor,
        rp.baseline.capacity_factor.value AS usp_tailing
        FROM chromeleon
        LATERAL VIEW EXPLODE(results) AS r
        LATERAL VIEW EXPLODE(r.peaks) AS rp
        ),
        unicorn_peaks AS (
        SELECT
        file_id AS injection_uuid,
        file_id AS result_uuid,
        file_id AS peak_uuid,
        file_id AS peak_id,
        '' AS amount_value,
        rp.name AS analyte,
        rp.area.value AS area_value,
        rp.area_percent.value AS area_percent,
        rp.width.value AS peak_width,
        rp.resolution.value AS resolution,
        '' AS retention_time,
        '' AS signal_to_noise_ratio,
        '' AS symmetry_factor,
        '' AS usp_tailing
        FROM akta
        LATERAL VIEW EXPLODE(result.peaks) AS rp
        )
        SELECT * FROM empower_peaks
        UNION ALL
        SELECT * FROM chromeleon_peaks
        UNION ALL
        SELECT * FROM unicorn_peaks

sink(output) Property

In the Workflow section, for the output (category: sink) property, enter the following information:

  • For needs, enter the name of the required SQL statement input that you created in the processor property.
  • For savePath, enter the target Lakehouse table name that you want to create in the following format: $( table('table_name_v') ) (For example, savePath: $( table('peaks') ))

📘

NOTE

You can transform source data into multiple Lakehouse tables in the scope of a single workflow by adding more sink code blocks.

Step 2: Deploy the Tetraflow Artifact

Deploy the Tetraflow artifact by doing the following:

  1. In your local terminal, navigate to the directory that contains the Tetraflow.
  2. Run the following command:
    ts-cli publish --config ./path_to_config

Step 3: Create a Pipeline by Using the Deployed Tetraflow Artifact

To use your new Tetraflow in the TDP, create a new pipeline that uses the Tetraflow that you deployed.

When creating the pipeline, make sure that you do the following:

  1. In the Select Pipeline Type dialog that appears on the Pipeline Manager page after you select the New Pipeline button, select Tetraflow.
  2. For Trigger Type, select Scheduled Trigger. This action creates a pipeline that runs at a specific, recurring time.
  3. Choose Next.
    Select Pipeline Type dialog
  4. In the Define Trigger section, configure a schedule for the Tetraflow Pipeline to run on. The minimum time interval is a minute. The maximum time interval is a year.
    Scheduled Trigger
  5. In the Select Protocol section, select the Tetraflow that you created from the list on the left.
  6. Configure the Cluster Size setting to define the size of the cluster that you want the TDP to spin up to run your Tetraflow Pipeline. For most use cases, the default Tiny cluster size is most appropriate.

    🚧

    IMPORTANT

    Before selecting any cluster size other than the default Tiny setting, please contact your customer success manager (CSM) or account executive. They will help you determine if a larger cluster size is required for your use case.

For more information, see Set Up a Pipeline.

Monitor Tetraflow Pipeline Processing

To monitor Tetraflow Pipeline Processing, files, workflows, and logs, do the following:

  1. Sign in to the TDP.

  2. In the left navigation menu, choose Pipelines. Then, choose Workflow Processing. The Workflow Processing page appears.
    Workflow Processing page

  3. To find a specific Tetraflow Pipeline, type the pipeline's name in the upper left search box. To filter your search by workflow date, enter a date range in the Start Date - End Date field. To filter by workflow status, select a status from the Select Status drop-down menu.

    📘

    NOTE

    The number of active workflows and historical workflows display near the top of the page. The active workflows count shows all workflows that are pending or currently being processed. The historical workflows count represents workflows that have been completed or have failed. If workflows are active, you can select the refresh button next to it to view the latest status.

  4. Select a workflow. Two tabs appear in the right panel:

    • The Pipeline tab provides a link that will take you to a page where you can edit your pipeline, scan for unprocessed files, create a bulk pipeline process job, cancel pending workflows, and view high-level triggers, and Tetraflow details.
    • The Workflow tab provides links to open workflows and view workflow logs.

Run a Tetraflow Pipeline Manually

To run a Tetraflow Pipeline manually, do the following:

  1. Open the Workflow Processing page.
  2. Select the Tetraflow Pipeline that you want to run.
  3. Select the right Pipeline tab.
  4. Choose Run Now. The Tetraflow Pipeline that you selected runs.

Edit a Tetraflow Pipeline

To edit a Tetraflow Pipeline, do the following:

  1. Open the Workflow Processing page.
  2. Select the Tetraflow Pipeline that you want to edit.
  3. Select the right Pipeline tab.
  4. Choose Edit Pipeline. The Pipeline Manager page appears.

For more information about how to edit pipelines, see Set Up and Edit Pipelines.