Skip to content

Commit

Permalink
Fix Flake8 errors (#8841)
Browse files Browse the repository at this point in the history
  • Loading branch information
kaxil authored May 12, 2020
1 parent 1d12c34 commit 4b06fde
Show file tree
Hide file tree
Showing 14 changed files with 47 additions and 47 deletions.
38 changes: 19 additions & 19 deletions airflow/api/common/experimental/mark_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,34 +136,34 @@ def set_state(
# Flake and pylint disagree about correct indents here
def all_subdag_tasks_query(sub_dag_run_ids, session, state, confirmed_dates): # noqa: E123
"""Get *all* tasks of the sub dags"""
qry_sub_dag = session.query(TaskInstance).\
qry_sub_dag = session.query(TaskInstance). \
filter(
TaskInstance.dag_id.in_(sub_dag_run_ids),
TaskInstance.execution_date.in_(confirmed_dates) # noqa: E123
).\
TaskInstance.dag_id.in_(sub_dag_run_ids),
TaskInstance.execution_date.in_(confirmed_dates)
). \
filter(
or_(
TaskInstance.state.is_(None),
TaskInstance.state != state
)
) # noqa: E123
or_(
TaskInstance.state.is_(None),
TaskInstance.state != state
)
) # noqa: E123
return qry_sub_dag


def get_all_dag_task_query(dag, session, state, task_ids, confirmed_dates): # noqa: E123
def get_all_dag_task_query(dag, session, state, task_ids, confirmed_dates):
"""Get all tasks of the main dag that will be affected by a state change"""
qry_dag = session.query(TaskInstance).\
qry_dag = session.query(TaskInstance). \
filter(
TaskInstance.dag_id == dag.dag_id,
TaskInstance.execution_date.in_(confirmed_dates),
TaskInstance.task_id.in_(task_ids) # noqa: E123
).\
TaskInstance.dag_id == dag.dag_id,
TaskInstance.execution_date.in_(confirmed_dates),
TaskInstance.task_id.in_(task_ids) # noqa: E123
). \
filter(
or_(
TaskInstance.state.is_(None),
TaskInstance.state != state
)
or_(
TaskInstance.state.is_(None),
TaskInstance.state != state
)
)
return qry_dag


Expand Down
11 changes: 5 additions & 6 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,12 +561,11 @@ def create_dag_run(self, dag, dag_runs=None, session=None):
# this query should be replaced by find dagrun
qry = (
session.query(func.max(DagRun.execution_date))
.filter_by(dag_id=dag.dag_id)
.filter(or_(
DagRun.external_trigger == False, # noqa: E712 pylint: disable=singleton-comparison
# add % as a wildcard for the like query
DagRun.run_id.like(f"{DagRunType.SCHEDULED.value}__%")
)
.filter_by(dag_id=dag.dag_id)
.filter(or_(
DagRun.external_trigger == False, # noqa: E712 pylint: disable=singleton-comparison
# add % as a wildcard for the like query
DagRun.run_id.like(f"{DagRunType.SCHEDULED.value}__%"))
)
)
last_scheduled_run = qry.scalar()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
)

create_table = BigQueryCreateEmptyTableOperator(
task_id=f"create_table",
task_id="create_table",
dataset_id=DATASET_NAME,
table_id=TABLE,
schema_fields=[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def safe_name(s: str) -> str:
default_args = {"start_date": days_ago(1)}

with models.DAG(
dag_id=f"example_presto_to_gcs",
dag_id="example_presto_to_gcs",
default_args=default_args,
schedule_interval=None, # Override to match your needs
tags=["example"],
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/hooks/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def upload_function_zip(self, location: str, zip_path: str, project_id: str) ->
"""
response = \
self.get_conn().projects().locations().functions().generateUploadUrl( # pylint: disable=no-member # noqa
parent=self._full_location(project_id, location)
parent=self._full_location(project_id, location)
).execute(num_retries=self.num_retries)

upload_url = response.get('uploadUrl')
Expand Down
11 changes: 6 additions & 5 deletions airflow/providers/qubole/operators/qubole_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,11 @@ def handle_airflow_exception(airflow_exception, hook):
if cmd.is_success(cmd.status):
qubole_command_results = hook.get_query_results()
qubole_command_id = cmd.id
exception_message = '\nQubole Command Id: {qubole_command_id}' \
'\nQubole Command Results:' \
'\n{qubole_command_results}'.format(
qubole_command_id=qubole_command_id, # noqa: E122
qubole_command_results=qubole_command_results)
exception_message = \
'\nQubole Command Id: {qubole_command_id}' \
'\nQubole Command Results:' \
'\n{qubole_command_results}'.format(
qubole_command_id=qubole_command_id,
qubole_command_results=qubole_command_results)
raise AirflowException(str(airflow_exception) + exception_message)
raise AirflowException(str(airflow_exception))
2 changes: 1 addition & 1 deletion airflow/secrets/local_filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def _parse_secret_file(file_path: str) -> Dict[str, Any]:

if parse_errors:
raise AirflowFileParseException(
f"Failed to load the secret file.", file_path=file_path, parse_errors=parse_errors
"Failed to load the secret file.", file_path=file_path, parse_errors=parse_errors
)

return secrets
Expand Down
10 changes: 5 additions & 5 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -1673,11 +1673,11 @@ def duration(self, session=None):
TF = TaskFail
ti_fails = (
session.query(TF)
.filter(TF.dag_id == dag.dag_id,
TF.execution_date >= min_date,
TF.execution_date <= base_date,
TF.task_id.in_([t.task_id for t in dag.tasks]))
.all() # noqa
.filter(TF.dag_id == dag.dag_id,
TF.execution_date >= min_date,
TF.execution_date <= base_date,
TF.task_id.in_([t.task_id for t in dag.tasks]))
.all()
)

fails_totals = defaultdict(int)
Expand Down
2 changes: 1 addition & 1 deletion backport_packages/setup_backport_packages.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ def usage():
for package in packages:
out += f"{package} "
out_array = textwrap.wrap(out, 80)
print(f"Available packages: ")
print("Available packages: ")
print()
for text in out_array:
print(text)
Expand Down
4 changes: 2 additions & 2 deletions dev/send_email.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
SMTP_PORT = 587
SMTP_SERVER = "mail-relay.apache.org"
MAILING_LIST = {
"dev": f"dev@airflow.apache.org",
"users": f"users@airflow.apache.org"
"dev": "dev@airflow.apache.org",
"users": "users@airflow.apache.org"
}


Expand Down
4 changes: 2 additions & 2 deletions scripts/ci/pre_commit_yaml_to_cfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def write_config(yaml_config_file_path: str, default_cfg_file_path: str):
configfile.write("\n")
for single_line_desc in section_description:
if single_line_desc == "":
configfile.write(f"#\n")
configfile.write("#\n")
else:
configfile.write(f"# {single_line_desc}\n")

Expand All @@ -108,7 +108,7 @@ def write_config(yaml_config_file_path: str, default_cfg_file_path: str):
configfile.write("\n")
for single_line_desc in option_description:
if single_line_desc == "":
configfile.write(f"#\n")
configfile.write("#\n")
else:
configfile.write(f"# {single_line_desc}\n")

Expand Down
2 changes: 1 addition & 1 deletion scripts/list-integrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def _find_clazzes(directory, base_class):

prog = "./" + os.path.basename(sys.argv[0])

HELP = f"""\
HELP = """\
List operators, hooks, sensors, secrets backend in the installed Airflow.
You can combine this script with other tools e.g. awk, grep, cut, uniq, sort.
Expand Down
2 changes: 1 addition & 1 deletion tests/executors/test_executor_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def test_should_support_plugins(self):

def test_should_support_custom_path(self):
with conf_vars({
("core", "executor"): f"tests.executors.test_executor_loader.FakeExecutor"
("core", "executor"): "tests.executors.test_executor_loader.FakeExecutor"
}):
executor = ExecutorLoader.get_default_executor()
self.assertIsNotNone(executor)
Expand Down
2 changes: 1 addition & 1 deletion tests/models/test_serialized_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def test_remove_stale_dags(self):
@mock.patch('airflow.models.serialized_dag.STORE_SERIALIZED_DAGS', True)
def test_bulk_sync_to_db(self):
dags = [
DAG(f"dag_1"), DAG(f"dag_2"), DAG(f"dag_3"),
DAG("dag_1"), DAG("dag_2"), DAG("dag_3"),
]
with assert_queries_count(7):
SDM.bulk_sync_to_db(dags)

0 comments on commit 4b06fde

Please sign in to comment.
  翻译: