Skip to content

Commit

Permalink
LookerStartPdtBuildOperator, LookerCheckPdtBuildSensor : fix empt…
Browse files Browse the repository at this point in the history
…y materialization id handling (#23025)

* fix empty materialization id handling
  • Loading branch information
alekseiloginov committed Apr 26, 2022
1 parent 43bcfa1 commit 37a7b27
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 1 deletion.
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/operators/looker.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def execute(self, context: "Context") -> str:

self.materialization_id = resp.materialization_id

if self.materialization_id is None:
if not self.materialization_id:
raise AirflowException(
f'No `materialization_id` was returned for model: {self.model}, view: {self.view}.'
)
Expand Down
3 changes: 3 additions & 0 deletions airflow/providers/google/cloud/sensors/looker.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ def poke(self, context: "Context") -> bool:

self.hook = LookerHook(looker_conn_id=self.looker_conn_id)

if not self.materialization_id:
raise AirflowException('Invalid `materialization_id`.')

# materialization_id is templated var pulling output from start task
status_dict = self.hook.pdt_build_status(materialization_id=self.materialization_id)
status = status_dict['status']
Expand Down
23 changes: 23 additions & 0 deletions tests/providers/google/cloud/operators/test_looker.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
from unittest import mock
from unittest.mock import MagicMock

import pytest

from airflow.exceptions import AirflowException
from airflow.models import DAG, DagBag
from airflow.providers.google.cloud.operators.looker import LookerStartPdtBuildOperator
from airflow.utils.timezone import datetime
Expand Down Expand Up @@ -146,3 +149,23 @@ def test_on_kill(self, mock_hook):
task.cancel_on_kill = True
task.on_kill()
mock_hook.return_value.stop_pdt_build.assert_called_once_with(materialization_id=TEST_JOB_ID)

@mock.patch(OPERATOR_PATH.format("LookerHook"))
def test_materialization_id_returned_as_empty_str(self, mock_hook):
# mock return vals from hook
mock_hook.return_value.start_pdt_build.return_value.materialization_id = ""
mock_hook.return_value.wait_for_job.return_value = None

# run task in mock context (asynchronous=False)
task = LookerStartPdtBuildOperator(
task_id=TASK_ID,
looker_conn_id=LOOKER_CONN_ID,
model=MODEL,
view=VIEW,
)

# check AirflowException is raised
with pytest.raises(
AirflowException, match=f'No `materialization_id` was returned for model: {MODEL}, view: {VIEW}.'
):
task.execute(context=self.mock_context)
12 changes: 12 additions & 0 deletions tests/providers/google/cloud/sensors/test_looker.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,15 @@ def test_cancelled(self, mock_hook):

# assert hook.pdt_build_status called once
mock_hook.return_value.pdt_build_status.assert_called_once_with(materialization_id=TEST_JOB_ID)

def test_empty_materialization_id(self):

# run task in mock context
sensor = LookerCheckPdtBuildSensor(
task_id=TASK_ID,
looker_conn_id=LOOKER_CONN_ID,
materialization_id="",
)

with pytest.raises(AirflowException, match="^Invalid `materialization_id`.$"):
sensor.poke(context={})

0 comments on commit 37a7b27

Please sign in to comment.
  翻译: