Skip to content

Commit

Permalink
Migrate Google azure_fileshare example DAG to new design AIP-47 (#24349)
Browse files Browse the repository at this point in the history
related: #22430, #22447
  • Loading branch information
chenglongyan committed Jun 12, 2022
1 parent 6ab02b6 commit bc3fc8c
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ All parameters are described in the reference documentation - :class:`~airflow.p

An example operator call might look like this:

.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_azure_fileshare_to_gcs.py
.. exampleinclude:: /../../tests/system/providers/google/azure/example_azure_fileshare_to_gcs.py
:language: python
:dedent: 4
:start-after: [START howto_operator_azure_fileshare_to_gcs_basic]
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,20 @@
from datetime import datetime, timedelta

from airflow import DAG
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
from airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs import AzureFileShareToGCSOperator
from airflow.utils.trigger_rule import TriggerRule

DEST_GCS_BUCKET = os.environ.get('GCP_GCS_BUCKET', 'gs://INVALID BUCKET NAME')
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
DAG_ID = 'azure_fileshare_to_gcs_example'

BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
AZURE_SHARE_NAME = os.environ.get('AZURE_SHARE_NAME', 'test-azure-share')
AZURE_DIRECTORY_NAME = "test-azure-dir"


with DAG(
dag_id='azure_fileshare_to_gcs_example',
dag_id=DAG_ID,
default_args={
'owner': 'airflow',
'depends_on_past': False,
Expand All @@ -39,16 +44,45 @@
schedule_interval='@once',
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['example'],
tags=['example', 'azure'],
) as dag:
create_bucket = GCSCreateBucketOperator(
task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
)

# [START howto_operator_azure_fileshare_to_gcs_basic]
sync_azure_files_with_gcs = AzureFileShareToGCSOperator(
task_id='sync_azure_files_with_gcs',
share_name=AZURE_SHARE_NAME,
dest_gcs=DEST_GCS_BUCKET,
dest_gcs=BUCKET_NAME,
directory_name=AZURE_DIRECTORY_NAME,
replace=False,
gzip=True,
google_impersonation_chain=None,
)
# [END howto_operator_azure_fileshare_to_gcs_basic]

delete_bucket = GCSDeleteBucketOperator(
task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
)

(
# TEST SETUP
create_bucket
# TEST BODY
>> sync_azure_files_with_gcs
# TEST TEARDOWN
>> delete_bucket
)

from tests.system.utils.watcher import watcher

# This test needs watcher in order to properly mark success/failure
# when "tearDown" task with trigger rule is part of the DAG
list(dag.tasks) >> watcher()


from tests.system.utils import get_test_run # noqa: E402

# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
test_run = get_test_run(dag)

0 comments on commit bc3fc8c

Please sign in to comment.
  翻译: