Skip to content

Commit

Permalink
fix(google): add return statement to yield within a while loop in tri…
Browse files Browse the repository at this point in the history
…ggers (#38394)
  • Loading branch information
Lee-W committed Mar 22, 2024
1 parent eead6c2 commit da4f6f0
Show file tree
Hide file tree
Showing 6 changed files with 7 additions and 6 deletions.
3 changes: 1 addition & 2 deletions airflow/providers/google/cloud/triggers/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
"records": None,
}
)
return
else:
# Extract only first record from the query results
first_record = records.pop(0)
Expand All @@ -171,7 +170,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
"records": first_record,
}
)
return
return
elif job_status["status"] == "error":
yield TriggerEvent({"status": "error", "message": job_status["message"]})
return
Expand Down
1 change: 1 addition & 0 deletions airflow/providers/google/cloud/triggers/cloud_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
"job_name": self.job_name,
}
)
return
elif operation.error.message:
raise AirflowException(f"Cloud Run Job error: {operation.error.message}")

Expand Down
1 change: 1 addition & 0 deletions airflow/providers/google/cloud/triggers/cloud_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ async def run(self):
}
)
return

yield TriggerEvent(
{
"operation_name": operation["name"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,6 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
self.log.info("Operation is still running.")
self.log.info("Sleeping for %ss...", self.poll_interval)
await asyncio.sleep(self.poll_interval)

else:
yield TriggerEvent(
{
Expand Down
2 changes: 2 additions & 0 deletions airflow/providers/google/cloud/triggers/mlengine.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,14 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
"message": "Job completed",
}
)
return
elif response_from_hook == "pending":
self.log.info("Job is still running...")
self.log.info("Sleeping for %s seconds.", self.poll_interval)
await asyncio.sleep(self.poll_interval)
else:
yield TriggerEvent({"status": "error", "message": response_from_hook})
return

except Exception as e:
self.log.exception("Exception occurred while checking for query completion")
Expand Down
5 changes: 2 additions & 3 deletions airflow/providers/google/cloud/triggers/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,8 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
if pulled_messages:
if self.ack_messages:
await self.message_acknowledgement(pulled_messages)
yield TriggerEvent({"status": "success", "message": pulled_messages})
else:
yield TriggerEvent({"status": "success", "message": pulled_messages})
yield TriggerEvent({"status": "success", "message": pulled_messages})
return
else:
pulled_messages = await self.hook.pull(
project_id=self.project_id,
Expand Down

0 comments on commit da4f6f0

Please sign in to comment.
  翻译: