Skip to content

Commit

Permalink
Never set DagRun.state to State.NONE (#21263)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenglongyan authored Feb 9, 2022
1 parent 338f19e commit 07fe9e8
Show file tree
Hide file tree
Showing 5 changed files with 5 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
CloudDataFusionUpdateInstanceOperator,
)
from airflow.providers.google.cloud.sensors.datafusion import CloudDataFusionPipelineStateSensor
from airflow.utils.state import State

# [START howto_data_fusion_env_variables]
SERVICE_ACCOUNT = os.environ.get("GCP_DATAFUSION_SERVICE_ACCOUNT")
Expand Down Expand Up @@ -276,5 +275,5 @@
delete_pipeline >> delete_instance

if __name__ == "__main__":
dag.clear(dag_run_state=State.NONE)
dag.clear()
dag.run()
3 changes: 1 addition & 2 deletions airflow/providers/google/cloud/example_dags/example_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
from airflow.utils.state import State

START_DATE = datetime(2021, 1, 1)

Expand Down Expand Up @@ -215,5 +214,5 @@


if __name__ == '__main__':
dag.clear(dag_run_state=State.NONE)
dag.clear()
dag.run()
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
from airflow.providers.google.marketing_platform.sensors.campaign_manager import (
GoogleCampaignManagerReportSensor,
)
from airflow.utils.state import State

PROFILE_ID = os.environ.get("MARKETING_PROFILE_ID", "123456789")
FLOODLIGHT_ACTIVITY_ID = int(os.environ.get("FLOODLIGHT_ACTIVITY_ID", 12345))
Expand Down Expand Up @@ -166,5 +165,5 @@


if __name__ == "__main__":
dag.clear(dag_run_state=State.NONE)
dag.clear()
dag.run()
6 changes: 1 addition & 5 deletions tests/test_utils/system_tests_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from airflow.exceptions import AirflowException
from airflow.models.dagbag import DagBag
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import State
from tests.test_utils import AIRFLOW_MAIN_FOLDER
from tests.test_utils.logging_command_executor import get_executor

Expand Down Expand Up @@ -144,10 +143,7 @@ def run_dag(self, dag_id: str, dag_folder: str = DEFAULT_DAG_FOLDER) -> None:
)

self.log.info("Attempting to run DAG: %s", dag_id)
if os.environ.get("RUN_AIRFLOW_1_10") == "true":
dag.clear()
else:
dag.clear(dag_run_state=State.NONE)
dag.clear()
try:
dag.run(ignore_first_depends_on_past=True, verbose=True)
except Exception:
Expand Down
2 changes: 1 addition & 1 deletion tests/ti_deps/deps/test_dagrun_exists_dep.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def test_dagrun_doesnt_exist(self, mock_dagrun_find):
Task instances without dagruns should fail this dep
"""
dag = DAG('test_dag', max_active_runs=2)
dagrun = DagRun(state=State.NONE)
dagrun = DagRun(state=State.QUEUED)
ti = Mock(task=Mock(dag=dag), get_dagrun=Mock(return_value=dagrun))
assert not DagrunRunningDep().is_met(ti=ti)

Expand Down

0 comments on commit 07fe9e8

Please sign in to comment.
  翻译: