Skip to content

Commit

Permalink
fix(providers/google-marketing-platform): respect soft_fail argument …
Browse files Browse the repository at this point in the history
…when exception is raised (#34165)
  • Loading branch information
Lee-W authored Sep 7, 2023
1 parent f5857a9 commit ff23a30
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from typing import TYPE_CHECKING, Sequence

from airflow import AirflowException
from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.providers.google.marketing_platform.hooks.display_video import GoogleDisplayVideo360Hook
from airflow.sensors.base import BaseSensorOperator

Expand Down Expand Up @@ -86,9 +86,12 @@ def poke(self, context: Context) -> bool:
impersonation_chain=self.impersonation_chain,
)
operation = hook.get_sdf_download_operation(operation_name=self.operation_name)

if "error" in operation:
raise AirflowException(f'The operation finished in error with {operation["error"]}')
# TODO: remove this if block when min_airflow_version is set to higher than 2.7.1
message = f'The operation finished in error with {operation["error"]}'
if self.soft_fail:
raise AirflowSkipException(message)
raise AirflowException(message)
if operation and operation.get("done"):
return True
return False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,23 @@

from unittest import mock

import pytest

from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.providers.google.marketing_platform.sensors.display_video import (
GoogleDisplayVideo360GetSDFDownloadOperationSensor,
GoogleDisplayVideo360RunQuerySensor,
)

MODULE_NAME = "airflow.providers.google.marketing_platform.sensors.display_video"

API_VERSION = "api_version"
GCP_CONN_ID = "google_cloud_default"


class TestGoogleDisplayVideo360RunQuerySensor:
@mock.patch("airflow.providers.google.marketing_platform.sensors.display_video.GoogleDisplayVideo360Hook")
@mock.patch("airflow.providers.google.marketing_platform.sensors.display_video.BaseSensorOperator")
@mock.patch(f"{MODULE_NAME}.GoogleDisplayVideo360Hook")
@mock.patch(f"{MODULE_NAME}.BaseSensorOperator")
def test_poke(self, mock_base_op, hook_mock):
query_id = "QUERY_ID"
report_id = "REPORT_ID"
Expand All @@ -46,8 +51,8 @@ def test_poke(self, mock_base_op, hook_mock):


class TestGoogleDisplayVideo360Sensor:
@mock.patch("airflow.providers.google.marketing_platform.sensors.display_video.GoogleDisplayVideo360Hook")
@mock.patch("airflow.providers.google.marketing_platform.sensors.display_video.BaseSensorOperator")
@mock.patch(f"{MODULE_NAME}.GoogleDisplayVideo360Hook")
@mock.patch(f"{MODULE_NAME}.BaseSensorOperator")
def test_poke(self, mock_base_op, hook_mock):
operation_name = "operation_name"
op = GoogleDisplayVideo360GetSDFDownloadOperationSensor(
Expand All @@ -65,3 +70,23 @@ def test_poke(self, mock_base_op, hook_mock):
hook_mock.return_value.get_sdf_download_operation.assert_called_once_with(
operation_name=operation_name
)

@pytest.mark.parametrize(
"soft_fail, expected_exception", ((False, AirflowException), (True, AirflowSkipException))
)
@mock.patch(f"{MODULE_NAME}.GoogleDisplayVideo360Hook")
@mock.patch(f"{MODULE_NAME}.BaseSensorOperator")
def test_poke_with_exception(
self, mock_base_op, hook_mock, soft_fail: bool, expected_exception: AirflowException
):
operation_name = "operation_name"
op = GoogleDisplayVideo360GetSDFDownloadOperationSensor(
operation_name=operation_name,
api_version=API_VERSION,
task_id="test_task",
soft_fail=soft_fail,
)
hook_mock.return_value.get_sdf_download_operation.return_value = {"error": "error"}

with pytest.raises(expected_exception, match="The operation finished in error with error"):
op.poke(context=None)

0 comments on commit ff23a30

Please sign in to comment.
  翻译: