使用 Dataplex 进行数据沿袭

Cloud Composer 1 |Cloud Composer 2 |Cloud Composer 3

数据沿袭集成简介

数据沿袭是一个 Dataplex 功能,您可以跟踪数据在系统中是如何移动的: 数据来源、传递到数据的位置以及对数据应用了哪些转换。 数据沿袭适用于:

  • 运行 2.1.2 及更高版本的 Cloud Composer 2 环境,以及运行 2.2.5 及更高版本的 Airflow。

  • 支持数据谱系的数据目录区域相同的区域中的 Cloud Composer 2 环境。

在 Cloud Composer 环境中启用该功能后 如果 DAG 使用任何受支持的运算符, Cloud Composer 向 Data Lineage API 报告沿袭信息。

然后,您可以通过以下方式访问这些信息:

支持的运算符

以下运算符支持 Cloud Composer:

  • airflow.providers.google.cloud.operators.bigquery.BigQueryExecuteQueryOperator
  • airflow.providers.google.cloud.operators.bigquery.BigQueryInsertJobOperator
  • airflow.providers.google.cloud.transfers.bigquery_to_bigquery.BigQueryToBigQueryOperator
  • airflow.contrib.operators.bigquery_to_gcs.BigQueryToCloudStorageOperator
  • airflow.providers.google.cloud.transfers.bigquery_to_gcs.BigQueryToGCSOperator
  • airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSToBigQueryOperator
  • airflow.contrib.operators.gcs_to_bq.GoogleCloudStorageToBigQueryOperator
  • airflow.providers.google.cloud.operators.dataproc.DataprocSubmitJobOperator

例如,运行以下任务:

task = BigQueryInsertJobOperator(
    task_id='snapshot_task',
    dag=dag,
    location='<dataset-location>',
    configuration={
        'query': {
            'query': 'SELECT * FROM dataset.tableA',
            'useLegacySql': False,
            'destinationTable': {
                'project_id': GCP_PROJECT,
                'dataset_id': 'dataset',
                'table_id': 'tableB',
            },
        }
    },
)

导致在 Dataplex 界面中创建以下沿袭图:

Dataplex 界面中的沿袭图示例。
图 1:BigQuery 表的沿袭图示例 Dataplex 界面。

Cloud Composer 的功能注意事项

每项报告数据沿袭的 Airflow 任务执行都会执行以下操作:

  • 针对谱系进程的一次创建或更新 RPC 请求
  • 针对谱系运行作业的一次创建或更新 RPC 请求
  • 用于创建沿袭事件的一个或多个 RPC 请求(大多数情况下为 0 或 1)

如需详细了解这些实体,请参阅 沿袭信息模型 和 Dataplex 中的 Lineage API 参考文档 文档。

发出的沿袭流量受制于 Data Lineage API 中的配额。 Cloud Composer 会消耗写入配额。

与处理谱系数据相关的价格取决于谱系价格。 请参阅数据沿袭注意事项

性能影响

数据沿袭会在 Airflow 任务执行结束时报告。 平均而言,数据沿袭报告大约需要 1-2 秒。

这不会影响任务本身的性能:Airflow 任务不会 如果沿袭未成功报告给 Lineage API,则会失败。 对主操作器逻辑没有影响,但整个任务实例会 执行时间略长一点,以便将沿袭数据纳入考虑范围。

由于报告数据谱系需要额外的时间,因此报告数据谱系的环境的相关费用会略有增加。

合规性

数据源流可为 VPC Service Controls 等功能提供不同的支持级别。查看数据谱系注意事项,确保支持级别符合您的环境要求。

使用数据沿袭集成

Cloud Composer 的数据沿袭集成通过 。这意味着,启用该功能需要完成以下两个步骤:

  1. 在项目中启用 Data Lineage API。
  2. 在特定 Cloud Composer 环境中启用数据沿袭集成。

准备工作

创建环境时,数据沿袭集成 如果满足以下条件,则会自动启用

  • 您的项目已启用 Data Lineage API。如需了解详情,请参阅 Dataplex 文档中的启用 Data Lineage API

  • 未在 Airflow 中配置自定义谱系后端

对于现有环境,您可以启用停用数据沿袭集成。

所需的角色

与数据沿袭集成需要为您的 Cloud Composer 环境服务账号:

  • 对于默认服务账号:无需进行任何更改。默认服务账号包含所需权限。
  • 对于用户管理的服务账号:向服务账号授予 Composer Worker (roles/composer.worker) 角色。此角色包含所有必要的数据传承权限。

如需了解详情,请参阅 Dataplex 文档中的沿袭角色和权限

在 Cloud Composer 中启用数据沿袭

控制台

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  3. 选择环境配置标签页。

  4. Dataplex 数据沿袭集成部分中,点击修改

  5. Dataplex 数据沿袭集成面板中,选择启用与 Dataplex 数据沿袭的集成,然后点击保存

gcloud

使用 --enable-cloud-data-lineage-integration 参数。

gcloud composer environments update ENVIRONMENT_NAME \
    --location LOCATION \
    --enable-cloud-data-lineage-integration

您需要将其中的:

  • ENVIRONMENT_NAME 替换为环境的名称。

    该名称必须以小写字母开头,后面最多可跟 62 个小写字母、数字或连字符,但不能以连字符结尾。该环境名称用于创建环境的子组件,因此您必须提供一个有效的 Cloud Storage 存储桶名称。如需查看限制列表,请参阅存储桶命名准则

  • LOCATION 替换为环境的区域。

    位置是指环境的 GKE 集群所在的区域 。

示例:

gcloud composer environments update example-environment \
    --location us-central1 \
    --enable-cloud-data-lineage-integration

发送自定义沿袭事件

如果您想报告某个运营商的沿袭,可以发送自定义沿袭事件 自动沿袭报告不支持的数据。

例如,如需使用以下代码发送自定义事件,请执行以下操作:

  • BashOperator,修改任务定义中的 inletsoutlets 参数。
  • PythonOperator,请修改 task.inletstask.outlets 参数 。为 inlets 参数使用 AUTO 会将其值设为其上游任务的 outlets

例如,运行以下任务:


from airflow.composer.data_lineage.entities import BigQueryTable
from airflow.lineage import AUTO



bash_task = BashOperator(
   task_id='bash_task',
   dag=dag,
   bash_command='sleep 0',
   inlets=[BigQueryTable(
       project_id=GCP_PROJECT,
       dataset_id='dataset',
       table_id='table1',
   )],
   outlets=[BigQueryTable(
       project_id=GCP_PROJECT,
       dataset_id='dataset',
       table_id='table2',
   )]
)


def _python_task(task):
   task.inlets.append(BigQueryTable(
       project_id=GCP_PROJECT,
       dataset_id='dataset',
       table_id='table3',
   ))

   task.outlets.append(BigQueryTable(
       project_id=GCP_PROJECT,
       dataset_id='dataset',
       table_id='table4',
   ))


python_task = PythonOperator(
   task_id='python_task',
   dag=dag,
   python_callable=_python_task,
   inlets=[AUTO],
)

bash_task >> python_task

这会在 Dataplex 界面中创建以下谱系图:

Dataplex 界面中自定义事件的谱系图示例。
图 2:多个 BigQuery 的沿袭图示例 Dataplex 界面中的各个表。

在 Cloud Composer 中停用数据沿袭

在 Cloud Composer 环境中停用谱系集成不会停用 Data Lineage API。如果要完全停用沿袭 报告项目的数据,也要停用 Data Lineage API。请参阅停用服务

控制台

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  3. 选择环境配置标签页。

  4. Dataplex 数据沿袭集成部分中,点击修改

  5. Dataplex 数据沿袭集成面板中,选择停用与 Dataplex 数据沿袭的集成,然后点击保存

gcloud

使用 --disable-cloud-data-lineage-integration 参数。

gcloud composer environments update ENVIRONMENT_NAME \
    --location LOCATION \
    --disable-cloud-data-lineage-integration

您需要将其中的:

  • ENVIRONMENT_NAME 替换为环境的名称。

    该名称必须以小写字母开头,后面最多可跟 62 个小写字母、数字或连字符,但不能以连字符结尾。该环境名称用于创建环境的子组件,因此您必须提供一个有效的 Cloud Storage 存储桶名称。如需查看限制列表,请参阅存储桶命名准则

  • LOCATION 替换为环境的区域。

    位置是指环境的 GKE 集群所在的区域 。

示例:

gcloud composer environments update example-environment \
    --location us-central1 \
    --disable-cloud-data-lineage-integration

在 Cloud Composer 中查看沿袭日志

您可以使用 Dataplex 中的环境配置页面 数据沿袭集成部分。

问题排查

如果沿袭数据未报告给 Lineage API,或者您在 Dataplex,请尝试执行以下问题排查步骤:

  • 确保在项目的项目中启用了 Data Lineage API Cloud Composer 环境。
  • 检查是否已在 Cloud Composer 中启用数据沿袭集成 环境
  • 检查您使用的运营商是否包含在自动沿袭中 报告支持。请参阅支持的 Airflow 运算符
  • 在 Cloud Composer 中查看谱系日志,了解可能存在的问题。