Skip to content

Commit

Permalink
Google Dataflow Hook to handle no Job Type (#14914)
Browse files Browse the repository at this point in the history
Co-authored-by: Tomek Urbaszek <turbaszek@gmail.com>
  • Loading branch information
TobKed and turbaszek committed Mar 23, 2021
1 parent 7c2ed53 commit a7e144b
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 1 deletion.
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/hooks/dataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ def _check_dataflow_job_state(self, job) -> bool:
:raise: Exception
"""
if self._wait_until_finished is None:
wait_for_running = job['type'] == DataflowJobType.JOB_TYPE_STREAMING
wait_for_running = job.get('type') == DataflowJobType.JOB_TYPE_STREAMING
else:
wait_for_running = not self._wait_until_finished

Expand Down
28 changes: 28 additions & 0 deletions tests/providers/google/cloud/hooks/test_dataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1414,6 +1414,34 @@ def test_check_dataflow_job_state_wait_until_finished(
result = dataflow_job._check_dataflow_job_state(job)
assert result == expected_result

# fmt: off
@parameterized.expand([
# RUNNING
(DataflowJobStatus.JOB_STATE_RUNNING, None, False),
(DataflowJobStatus.JOB_STATE_RUNNING, True, False),
(DataflowJobStatus.JOB_STATE_RUNNING, False, True),
# AWAITING STATE
(DataflowJobStatus.JOB_STATE_PENDING, None, False),
(DataflowJobStatus.JOB_STATE_PENDING, True, False),
(DataflowJobStatus.JOB_STATE_PENDING, False, True),
])
# fmt: on
def test_check_dataflow_job_state_without_job_type(self, job_state, wait_until_finished, expected_result):
job = {"id": "id-2", "name": "name-2", "currentState": job_state}
dataflow_job = _DataflowJobsController(
dataflow=self.mock_dataflow,
project_number=TEST_PROJECT,
name="name-",
location=TEST_LOCATION,
poll_sleep=0,
job_id=None,
num_retries=20,
multiple_jobs=True,
wait_until_finished=wait_until_finished,
)
result = dataflow_job._check_dataflow_job_state(job)
assert result == expected_result

# fmt: off
@parameterized.expand([
(DataflowJobType.JOB_TYPE_BATCH, DataflowJobStatus.JOB_STATE_FAILED,
Expand Down

0 comments on commit a7e144b

Please sign in to comment.
  翻译: