Skip to content

Commit

Permalink
Add append_job_name parameter in DataflowStartFlexTemplateOperator (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
VladaZakharova authored May 24, 2023
1 parent e4f5cb0 commit 769e204
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 5 deletions.
6 changes: 5 additions & 1 deletion airflow/providers/google/cloud/operators/dataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,7 @@ class DataflowStartFlexTemplateOperator(GoogleCloudBaseOperator):
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account (templated).
:param deferrable: Run operator in the deferrable mode.
:param append_job_name: True if unique suffix has to be appended to job name.
"""

template_fields: Sequence[str] = ("body", "location", "project_id", "gcp_conn_id")
Expand All @@ -798,6 +799,7 @@ def __init__(
wait_until_finished: bool | None = None,
impersonation_chain: str | Sequence[str] | None = None,
deferrable: bool = False,
append_job_name: bool = True,
*args,
**kwargs,
) -> None:
Expand All @@ -812,6 +814,7 @@ def __init__(
self.job: dict | None = None
self.impersonation_chain = impersonation_chain
self.deferrable = deferrable
self.append_job_name = append_job_name

self._validate_deferrable_params()

Expand All @@ -838,7 +841,8 @@ def hook(self) -> DataflowHook:
return hook

def execute(self, context: Context):
self._append_uuid_to_job_name()
if self.append_job_name:
self._append_uuid_to_job_name()

def set_current_job(current_job):
self.job = current_job
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,8 @@ Here is an example of running Flex template with
.. exampleinclude:: /../../tests/system/providers/google/cloud/dataflow/example_dataflow_template.py
:language: python
:dedent: 4
:start-after: [START howto_operator_start_template_job]
:end-before: [END howto_operator_start_template_job]
:start-after: [START howto_operator_start_flex_template_job]
:end-before: [END howto_operator_start_flex_template_job]

.. _howto/operator:DataflowStartSqlJobOperator:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@
from pathlib import Path

from airflow import models
from airflow.providers.google.cloud.operators.dataflow import DataflowTemplatedJobStartOperator
from airflow.providers.google.cloud.operators.dataflow import (
DataflowStartFlexTemplateOperator,
DataflowTemplatedJobStartOperator,
)
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
from airflow.utils.trigger_rule import TriggerRule
Expand All @@ -39,10 +42,12 @@
BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"

FILE_NAME = "text.txt"
SCHEMA = "schema.json"
GCS_TMP = f"gs://{BUCKET_NAME}/temp/"
GCS_STAGING = f"gs://{BUCKET_NAME}/staging/"
GCS_OUTPUT = f"gs://{BUCKET_NAME}/output"
PYTHON_FILE_LOCAL_PATH = str(Path(__file__).parent / "resources" / FILE_NAME)
SCHEMA_LOCAL_PATH = str(Path(__file__).parent / "resources" / SCHEMA)
LOCATION = "europe-west3"

default_args = {
Expand All @@ -51,6 +56,20 @@
"stagingLocation": GCS_STAGING,
}
}
BODY = {
"launchParameter": {
"jobName": "test-flex-template",
"parameters": {
"inputFileSpec": f"gs://{BUCKET_NAME}/{FILE_NAME}",
"outputBucket": f"gs://{BUCKET_NAME}/output/file.avro",
"outputFileFormat": "avro",
"inputFileFormat": "csv",
"schema": f"gs://{BUCKET_NAME}/{SCHEMA}",
},
"environment": {},
"containerSpecGcsPath": "gs://dataflow-templates/latest/flex/File_Format_Conversion",
},
}

with models.DAG(
DAG_ID,
Expand All @@ -69,6 +88,13 @@
bucket=BUCKET_NAME,
)

upload_schema = LocalFilesystemToGCSOperator(
task_id="upload_schema_to_bucket",
src=SCHEMA_LOCAL_PATH,
dst=SCHEMA,
bucket=BUCKET_NAME,
)

# [START howto_operator_start_template_job]
start_template_job = DataflowTemplatedJobStartOperator(
task_id="start_template_job",
Expand All @@ -79,11 +105,28 @@
)
# [END howto_operator_start_template_job]

# [START howto_operator_start_flex_template_job]
start_flex_template_job = DataflowStartFlexTemplateOperator(
task_id="start_flex_template_job",
project_id=PROJECT_ID,
body=BODY,
location=LOCATION,
append_job_name=False,
)
# [END howto_operator_start_flex_template_job]

delete_bucket = GCSDeleteBucketOperator(
task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
)

create_bucket >> upload_file >> start_template_job >> delete_bucket
(
create_bucket
>> upload_file
>> upload_schema
>> start_template_job
>> start_flex_template_job
>> delete_bucket
)

from tests.system.utils.watcher import watcher

Expand Down
14 changes: 14 additions & 0 deletions tests/system/providers/google/cloud/dataflow/resources/schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"type": "record",
"name": "Person",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "surname",
"type": "string"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
name,surname
John,Doe
Jane,Smith

0 comments on commit 769e204

Please sign in to comment.
  翻译: