Skip to content

Commit

Permalink
Improve Google PubSub hook publish method (#7831)
Browse files Browse the repository at this point in the history
  • Loading branch information
turbaszek authored Mar 23, 2020
1 parent 4bde99f commit 529db07
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 6 deletions.
8 changes: 4 additions & 4 deletions airflow/providers/google/cloud/example_dags/example_pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
) as example_sensor_dag:
# [START howto_operator_gcp_pubsub_create_topic]
create_topic = PubSubCreateTopicOperator(
task_id="create_topic", topic=TOPIC_FOR_SENSOR_DAG, project_id=GCP_PROJECT_ID
task_id="create_topic", topic=TOPIC_FOR_SENSOR_DAG, project_id=GCP_PROJECT_ID, fail_if_exists=False
)
# [END howto_operator_gcp_pubsub_create_topic]

Expand Down Expand Up @@ -84,7 +84,7 @@
task_id="publish_task",
project_id=GCP_PROJECT_ID,
topic=TOPIC_FOR_SENSOR_DAG,
messages=[MESSAGE, MESSAGE, MESSAGE],
messages=[MESSAGE] * 10,
)
# [END howto_operator_gcp_pubsub_publish]

Expand All @@ -102,8 +102,8 @@
)
# [END howto_operator_gcp_pubsub_delete_topic]

create_topic >> subscribe_task >> publish_task
subscribe_task >> pull_messages >> pull_messages_result >> unsubscribe_task >> delete_topic
create_topic >> subscribe_task >> [publish_task, pull_messages]
pull_messages >> pull_messages_result >> unsubscribe_task >> delete_topic


with models.DAG(
Expand Down
5 changes: 3 additions & 2 deletions airflow/providers/google/cloud/hooks/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,14 @@ def publish(
self.log.info("Publish %d messages to topic (path) %s", len(messages), topic_path)
try:
for message in messages:
publisher.publish(
future = publisher.publish(
topic=topic_path,
data=message.get("data", b''),
**message.get('attributes', {})
)
future.result()
except GoogleAPICallError as e:
raise PubSubException('Error publishing to topic {}'.format(topic_path), e)
raise PubSubException(f'Error publishing to topic {topic_path}', e)

self.log.info("Published %d messages to topic (path) %s", len(messages), topic_path)

Expand Down

0 comments on commit 529db07

Please sign in to comment.
  翻译: