Skip to content

Commit

Permalink
Merge GCSObjectExistenceAsyncSensor logic to GCSObjectExistenceSensor (
Browse files Browse the repository at this point in the history
  • Loading branch information
Lee-W authored Mar 21, 2023
1 parent 6a6bff3 commit 79a2fa7
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 41 deletions.
72 changes: 44 additions & 28 deletions airflow/providers/google/cloud/sensors/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import textwrap
import warnings
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Callable, Sequence
from typing import TYPE_CHECKING, Any, Callable, Sequence

from google.api_core.retry import Retry
from google.cloud.storage.retry import DEFAULT_RETRY
Expand Down Expand Up @@ -75,6 +75,7 @@ def __init__(
delegate_to: str | None = None,
impersonation_chain: str | Sequence[str] | None = None,
retry: Retry = DEFAULT_RETRY,
deferrable: bool = False,
**kwargs,
) -> None:

Expand All @@ -90,6 +91,8 @@ def __init__(
self.impersonation_chain = impersonation_chain
self.retry = retry

self.deferrable = deferrable

def poke(self, context: Context) -> bool:
self.log.info("Sensor checks existence of : %s, %s", self.bucket, self.object)
hook = GCSHook(
Expand All @@ -99,10 +102,43 @@ def poke(self, context: Context) -> bool:
)
return hook.exists(self.bucket, self.object, self.retry)

def execute(self, context: Context) -> None:
"""Airflow runs this method on the worker and defers using the trigger."""
if not self.deferrable:
super().execute(context)
else:
self.defer(
timeout=timedelta(seconds=self.timeout),
trigger=GCSBlobTrigger(
bucket=self.bucket,
object_name=self.object,
poke_interval=self.poke_interval,
google_cloud_conn_id=self.google_cloud_conn_id,
hook_params={
"delegate_to": self.delegate_to,
"impersonation_chain": self.impersonation_chain,
},
),
method_name="execute_complete",
)

def execute_complete(self, context: Context, event: dict[str, str]) -> str:
"""
Callback for when the trigger fires - returns immediately.
Relies on trigger to throw an exception, otherwise it assumes execution was
successful.
"""
if event["status"] == "error":
raise AirflowException(event["message"])
self.log.info("File %s was found in bucket %s.", self.object, self.bucket)
return event["message"]


class GCSObjectExistenceAsyncSensor(GCSObjectExistenceSensor):
"""
Checks for the existence of a file in Google Cloud Storage .
Checks for the existence of a file in Google Cloud Storage.
Class `GCSObjectExistenceAsyncSensor` is deprecated and will be removed in a future release.
Please use `GCSObjectExistenceSensor` and set `deferrable` attribute to `True` instead
:param bucket: The Google Cloud Storage bucket where the object is.
:param object: The name of the object to check in the Google cloud storage bucket.
Expand All @@ -120,33 +156,13 @@ class GCSObjectExistenceAsyncSensor(GCSObjectExistenceSensor):
account from the list granting this role to the originating account (templated).
"""

def execute(self, context: Context) -> None:
"""Airflow runs this method on the worker and defers using the trigger."""
self.defer(
timeout=timedelta(seconds=self.timeout),
trigger=GCSBlobTrigger(
bucket=self.bucket,
object_name=self.object,
poke_interval=self.poke_interval,
google_cloud_conn_id=self.google_cloud_conn_id,
hook_params={
"delegate_to": self.delegate_to,
"impersonation_chain": self.impersonation_chain,
},
),
method_name="execute_complete",
def __init__(self, **kwargs: Any) -> None:
warnings.warn(
"Class `GCSObjectExistenceAsyncSensor` is deprecated and will be removed in a future release. "
"Please use `GCSObjectExistenceSensor` and set `deferrable` attribute to `True` instead",
DeprecationWarning,
)

def execute_complete(self, context: Context, event: dict[str, str]) -> str:
"""
Callback for when the trigger fires - returns immediately.
Relies on trigger to throw an exception, otherwise it assumes execution was
successful.
"""
if event["status"] == "error":
raise AirflowException(event["message"])
self.log.info("File %s was found in bucket %s.", self.object, self.bucket)
return event["message"]
super().__init__(deferrable=True, **kwargs)


def ts_function(context):
Expand Down
10 changes: 8 additions & 2 deletions docs/apache-airflow-providers-google/operators/cloud/gcs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -164,14 +164,20 @@ Use the :class:`~airflow.providers.google.cloud.sensors.gcs.GCSObjectExistenceSe
:start-after: [START howto_sensor_object_exists_task]
:end-before: [END howto_sensor_object_exists_task]

Also you can use deferrable mode in this operator if you would like to free up the worker slots while the sensor is running.

.. exampleinclude:: /../../tests/system/providers/google/cloud/gcs/example_gcs_sensor.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_object_exists_task_defered]
:end-before: [END howto_sensor_object_exists_task_defered]

.. _howto/sensor:GCSObjectExistenceAsyncSensor:

GCSObjectExistenceAsyncSensor
-----------------------------

Use the :class:`~airflow.providers.google.cloud.sensors.gcs.GCSObjectExistenceAsyncSensor`
(deferrable version) if you would like to free up the worker slots while the sensor is running.
:class:`~airflow.providers.google.cloud.sensors.gcs.GCSObjectExistenceAsyncSensor` is deprecated and will be removed in a future release. Please use :class:`~airflow.providers.google.cloud.sensors.gcs.GCSObjectExistenceSensor` and use the deferrable mode in that operator.

.. exampleinclude:: /../../tests/system/providers/google/cloud/gcs/example_gcs_sensor.py
:language: python
Expand Down
69 changes: 59 additions & 10 deletions tests/providers/google/cloud/sensors/test_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,47 +98,96 @@ def test_should_pass_argument_to_hook(self, mock_hook):
)
mock_hook.return_value.exists.assert_called_once_with(TEST_BUCKET, TEST_OBJECT, DEFAULT_RETRY)


class TestGoogleCloudStorageObjectSensorAsync:
def test_gcs_object_existence_sensor_async(self):
def test_gcs_object_existence_sensor_deferred(self):
"""
Asserts that a task is deferred and a GCSBlobTrigger will be fired
when the GCSObjectExistenceAsyncSensor is executed.
when the GCSObjectExistenceSensor is executed and deferrable is set to True.
"""
task = GCSObjectExistenceAsyncSensor(
task = GCSObjectExistenceSensor(
task_id="task-id",
bucket=TEST_BUCKET,
object=TEST_OBJECT,
google_cloud_conn_id=TEST_GCP_CONN_ID,
deferrable=True,
)
with pytest.raises(TaskDeferred) as exc:
task.execute(context)
assert isinstance(exc.value.trigger, GCSBlobTrigger), "Trigger is not a GCSBlobTrigger"

def test_gcs_object_existence_sensor_async_execute_failure(self):
"""Tests that an AirflowException is raised in case of error event"""
task = GCSObjectExistenceAsyncSensor(
def test_gcs_object_existence_sensor_deferred_execute_failure(self):
"""Tests that an AirflowException is raised in case of error event when deferrable is set to True"""
task = GCSObjectExistenceSensor(
task_id="task-id",
bucket=TEST_BUCKET,
object=TEST_OBJECT,
google_cloud_conn_id=TEST_GCP_CONN_ID,
deferrable=True,
)
with pytest.raises(AirflowException):
task.execute_complete(context=None, event={"status": "error", "message": "test failure message"})

def test_gcs_object_existence_sensor_async_execute_complete(self):
"""Asserts that logging occurs as expected"""
task = GCSObjectExistenceAsyncSensor(
"""Asserts that logging occurs as expected when deferrable is set to True"""
task = GCSObjectExistenceSensor(
task_id="task-id",
bucket=TEST_BUCKET,
object=TEST_OBJECT,
google_cloud_conn_id=TEST_GCP_CONN_ID,
deferrable=True,
)
with mock.patch.object(task.log, "info") as mock_log_info:
task.execute_complete(context=None, event={"status": "success", "message": "Job completed"})
mock_log_info.assert_called_with("File %s was found in bucket %s.", TEST_OBJECT, TEST_BUCKET)


class TestGoogleCloudStorageObjectSensorAsync:
depcrecation_message = (
"Class `GCSObjectExistenceAsyncSensor` is deprecated and will be removed in a future release. "
"Please use `GCSObjectExistenceSensor` and set `deferrable` attribute to `True` instead"
)

def test_gcs_object_existence_sensor_async(self):
"""
Asserts that a task is deferred and a GCSBlobTrigger will be fired
when the GCSObjectExistenceAsyncSensor is executed.
"""
with pytest.warns(DeprecationWarning, match=self.depcrecation_message):
task = GCSObjectExistenceAsyncSensor(
task_id="task-id",
bucket=TEST_BUCKET,
object=TEST_OBJECT,
google_cloud_conn_id=TEST_GCP_CONN_ID,
)
with pytest.raises(TaskDeferred) as exc:
task.execute(context)
assert isinstance(exc.value.trigger, GCSBlobTrigger), "Trigger is not a GCSBlobTrigger"

def test_gcs_object_existence_sensor_async_execute_failure(self):
"""Tests that an AirflowException is raised in case of error event"""
with pytest.warns(DeprecationWarning, match=self.depcrecation_message):
task = GCSObjectExistenceAsyncSensor(
task_id="task-id",
bucket=TEST_BUCKET,
object=TEST_OBJECT,
google_cloud_conn_id=TEST_GCP_CONN_ID,
)
with pytest.raises(AirflowException):
task.execute_complete(context=None, event={"status": "error", "message": "test failure message"})

def test_gcs_object_existence_sensor_async_execute_complete(self):
"""Asserts that logging occurs as expected"""
with pytest.warns(DeprecationWarning, match=self.depcrecation_message):
task = GCSObjectExistenceAsyncSensor(
task_id="task-id",
bucket=TEST_BUCKET,
object=TEST_OBJECT,
google_cloud_conn_id=TEST_GCP_CONN_ID,
)
with mock.patch.object(task.log, "info") as mock_log_info:
task.execute_complete(context=None, event={"status": "success", "message": "Job completed"})
mock_log_info.assert_called_with("File %s was found in bucket %s.", TEST_OBJECT, TEST_BUCKET)


class TestTsFunction:
def test_should_support_datetime(self):
context = {
Expand Down
13 changes: 12 additions & 1 deletion tests/system/providers/google/cloud/gcs/example_gcs_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ def mode_setter(self, value):
)
# [END howto_sensor_object_exists_task_async]

# [START howto_sensor_object_exists_task_defered]
gcs_object_exists_defered = GCSObjectExistenceSensor(
bucket=BUCKET_NAME, object=FILE_NAME, task_id="gcs_object_exists_defered", deferrable=True
)
# [END howto_sensor_object_exists_task_defered]

# [START howto_sensor_object_with_prefix_exists_task]
gcs_object_with_prefix_exists = GCSObjectsWithPrefixExistenceSensor(
bucket=BUCKET_NAME,
Expand All @@ -144,7 +150,12 @@ def mode_setter(self, value):
sleep,
upload_file,
# TEST BODY
[gcs_object_exists, gcs_object_exists_async, gcs_object_with_prefix_exists],
[
gcs_object_exists,
gcs_object_exists_defered,
gcs_object_exists_async,
gcs_object_with_prefix_exists,
],
# TEST TEARDOWN
delete_bucket,
)
Expand Down

0 comments on commit 79a2fa7

Please sign in to comment.
  翻译: