Skip to content

Commit

Permalink
Refactor shorter defaults in providers (#34347)
Browse files Browse the repository at this point in the history
  • Loading branch information
eumiro committed Sep 14, 2023
1 parent a122b57 commit 8ecd576
Show file tree
Hide file tree
Showing 22 changed files with 28 additions and 30 deletions.
2 changes: 1 addition & 1 deletion airflow/providers/amazon/aws/hooks/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -1375,7 +1375,7 @@ def download_file(
raise e

if preserve_file_name:
local_dir = local_path if local_path else gettempdir()
local_dir = local_path or gettempdir()
subdir = f"airflow_tmp_dir_{uuid4().hex[0:8]}" if use_autogenerated_subdir else ""
filename_in_s3 = s3_obj.key.rsplit("/", 1)[-1]
file_path = Path(local_dir, subdir, filename_in_s3)
Expand Down
6 changes: 3 additions & 3 deletions airflow/providers/amazon/aws/operators/datasync.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,16 +151,16 @@ def __init__(
self.allow_random_task_choice = allow_random_task_choice
self.allow_random_location_choice = allow_random_location_choice

self.create_task_kwargs = create_task_kwargs if create_task_kwargs else {}
self.create_task_kwargs = create_task_kwargs or {}
self.create_source_location_kwargs = {}
if create_source_location_kwargs:
self.create_source_location_kwargs = create_source_location_kwargs
self.create_destination_location_kwargs = {}
if create_destination_location_kwargs:
self.create_destination_location_kwargs = create_destination_location_kwargs

self.update_task_kwargs = update_task_kwargs if update_task_kwargs else {}
self.task_execution_kwargs = task_execution_kwargs if task_execution_kwargs else {}
self.update_task_kwargs = update_task_kwargs or {}
self.task_execution_kwargs = task_execution_kwargs or {}
self.delete_task_after_execution = delete_task_after_execution

# Validations
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/amazon/aws/transfers/azure_blob_to_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def execute(self, context: Context) -> list[str]:
# parent directories/keys
existing_files = s3_hook.list_keys(bucket_name, prefix=prefix)
# in case that no files exists, return an empty array to avoid errors
existing_files = existing_files if existing_files is not None else []
existing_files = existing_files or []
# remove the prefix for the existing files to allow the match
existing_files = [file.replace(f"{prefix}/", "", 1) for file in existing_files]
files = list(set(files) - set(existing_files))
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/amazon/aws/transfers/gcs_to_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,12 @@ def execute(self, context: Context) -> list[str]:
# filter all the objects (return empty list) instead of empty
# prefix returning all the objects
if prefix:
prefix = prefix if prefix.endswith("/") else f"{prefix}/"
prefix = prefix.rstrip("/") + "/"
# look for the bucket and the prefix to avoid look into
# parent directories/keys
existing_files = s3_hook.list_keys(bucket_name, prefix=prefix)
# in case that no files exists, return an empty array to avoid errors
existing_files = existing_files if existing_files is not None else []
existing_files = existing_files or []
# remove the prefix for the existing files to allow the match
existing_files = [file.replace(prefix, "", 1) for file in existing_files]
gcs_files = list(set(gcs_files) - set(existing_files))
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/apache/pig/hooks/pig.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def __init__(
" PigOperator. You can also pass ``pig-properties`` in the PigCliHook `init`. Currently,"
f" the {pig_cli_conn_id} connection has those extras: `{conn_pig_properties}`."
)
self.pig_properties = pig_properties if pig_properties else []
self.pig_properties = pig_properties or []
self.conn = conn
self.sub_process = None

Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/cncf/kubernetes/operators/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ def __init__(
self.get_logs = get_logs
self.container_logs = container_logs
if self.container_logs == KubernetesPodOperator.BASE_CONTAINER_NAME:
self.container_logs = base_container_name if base_container_name else self.BASE_CONTAINER_NAME
self.container_logs = base_container_name or self.BASE_CONTAINER_NAME
self.image_pull_policy = image_pull_policy
self.node_selector = node_selector or {}
self.annotations = annotations or {}
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/elasticsearch/hooks/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ class ElasticsearchPythonHook(BaseHook):
def __init__(self, hosts: list[Any], es_conn_args: dict | None = None):
super().__init__()
self.hosts = hosts
self.es_conn_args = es_conn_args if es_conn_args else {}
self.es_conn_args = es_conn_args or {}

def _get_elastic_connection(self):
"""Returns the Elasticsearch client."""
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/google/cloud/hooks/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def __init__(
self.location = location
self.priority = priority
self.running_job_id: str | None = None
self.api_resource_configs: dict = api_resource_configs if api_resource_configs else {}
self.api_resource_configs: dict = api_resource_configs or {}
self.labels = labels
self.credentials_path = "bigquery_hook_credentials.json"

Expand Down Expand Up @@ -2372,7 +2372,7 @@ def __init__(
self.use_legacy_sql = use_legacy_sql
if api_resource_configs:
_validate_value("api_resource_configs", api_resource_configs, dict)
self.api_resource_configs: dict = api_resource_configs if api_resource_configs else {}
self.api_resource_configs: dict = api_resource_configs or {}
self.running_job_id: str | None = None
self.location = location
self.num_retries = num_retries
Expand Down
4 changes: 1 addition & 3 deletions airflow/providers/google/cloud/hooks/cloud_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,9 +498,7 @@ def __init__(
self.gcp_conn_id = gcp_conn_id
self.command_line_parameters: list[str] = []
self.cloud_sql_proxy_socket_directory = self.path_prefix
self.sql_proxy_path = (
sql_proxy_binary_path if sql_proxy_binary_path else self.path_prefix + "_cloud_sql_proxy"
)
self.sql_proxy_path = sql_proxy_binary_path or f"{self.path_prefix}_cloud_sql_proxy"
self.credentials_path = self.path_prefix + "_credentials.json"
self._build_command_line_parameters()

Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/hooks/dataproc_metastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ def create_service(
request={
"parent": parent,
"service_id": service_id,
"service": service if service else {},
"service": service or {},
"request_id": request_id,
},
retry=retry,
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/operators/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -1983,7 +1983,7 @@ def __init__(
self.project_id = project_id
self.location = location
self.gcp_conn_id = gcp_conn_id
self.dataset_reference = dataset_reference if dataset_reference else {}
self.dataset_reference = dataset_reference or {}
self.impersonation_chain = impersonation_chain
if exists_ok is not None:
warnings.warn(
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/google/cloud/operators/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -1052,10 +1052,10 @@ def __init__(
self.dataproc_jars = dataproc_jars
self.region = region

self.job_error_states = job_error_states if job_error_states is not None else {"ERROR"}
self.job_error_states = job_error_states or {"ERROR"}
self.impersonation_chain = impersonation_chain
self.hook = DataprocHook(gcp_conn_id=gcp_conn_id, impersonation_chain=impersonation_chain)
self.project_id = self.hook.project_id if project_id is None else project_id
self.project_id = project_id or self.hook.project_id
self.job_template: DataProcJobBuilder | None = None
self.job: dict | None = None
self.dataproc_job_id = None
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/sensors/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ def __init__(
raise ValueError("inactivity_period must be non-negative")
self.inactivity_period = inactivity_period
self.min_objects = min_objects
self.previous_objects = previous_objects if previous_objects else set()
self.previous_objects = previous_objects or set()
self.inactivity_seconds = 0
self.allow_delete = allow_delete
self.google_cloud_conn_id = google_cloud_conn_id
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/transfers/mssql_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def __init__(
):
super().__init__(**kwargs)
self.mssql_conn_id = mssql_conn_id
self.bit_fields = bit_fields if bit_fields else []
self.bit_fields = bit_fields or []

def query(self):
"""
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/google/cloud/transfers/sql_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,12 +293,12 @@ def _write_local_data_files(self, cursor):
if self.export_format == "csv":
row = self.convert_types(schema, col_type_dict, row)
if self.null_marker is not None:
row = [value if value is not None else self.null_marker for value in row]
row = [value or self.null_marker for value in row]
csv_writer.writerow(row)
elif self.export_format == "parquet":
row = self.convert_types(schema, col_type_dict, row)
if self.null_marker is not None:
row = [value if value is not None else self.null_marker for value in row]
row = [value or self.null_marker for value in row]
rows_buffer.append(row)
if len(rows_buffer) >= self.parquet_row_group_size:
self._write_rows_to_parquet(parquet_writer, rows_buffer)
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/triggers/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ def __init__(
)
self.inactivity_period = inactivity_period
self.min_objects = min_objects
self.previous_objects = previous_objects if previous_objects else set()
self.previous_objects = previous_objects or set()
self.inactivity_seconds = 0.0
self.allow_delete = allow_delete
self.last_activity_time: datetime | None = None
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/utils/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def convert_job_id(job_id: str | list[str], project_id: str, location: str | Non
:param job_id: Required. The ID of the job.
:return: str or list[str] of project_id:location:job_id.
"""
location = location if location else "US"
location = location or "US"
if isinstance(job_id, list):
return [f"{project_id}:{location}:{i}" for i in job_id]
else:
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/utils/dataform.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def make_initialization_workspace_flow(
contents=dataform_config_content,
)

package_name = package_name if package_name else workspace_id
package_name = package_name or workspace_id
package_json_content = json.dumps(
{
"name": package_name,
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/grpc/hooks/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def __init__(
self.grpc_conn_id = grpc_conn_id
self.conn = self.get_connection(self.grpc_conn_id)
self.extras = self.conn.extra_dejson
self.interceptors = interceptors if interceptors else []
self.interceptors = interceptors or []
self.custom_connection_func = custom_connection_func

def get_conn(self) -> grpc.Channel:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def __init__(
if not radius_secret:
raise VaultError("The 'radius' authentication type requires 'radius_secret'")

self.kv_engine_version = kv_engine_version if kv_engine_version else 2
self.kv_engine_version = kv_engine_version or 2
self.url = url
self.auth_type = auth_type
self.kwargs = kwargs
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/oracle/hooks/oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ def bulk_insert_rows(
if self.supports_autocommit:
self.set_autocommit(conn, False)
cursor = conn.cursor() # type: ignore[attr-defined]
values_base = target_fields if target_fields else rows[0]
values_base = target_fields or rows[0]
prepared_stm = "insert into {tablename} {columns} values ({values})".format(
tablename=table,
columns="({})".format(", ".join(target_fields)) if target_fields else "",
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/redis/log/redis_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def __init__(
self.handler: _RedisHandler | None = None
self.max_lines = max_lines
self.ttl_seconds = ttl_seconds
self.conn_id = conn_id if conn_id is not None else conf.get("logging", "REMOTE_LOG_CONN_ID")
self.conn_id = conn_id or conf.get("logging", "REMOTE_LOG_CONN_ID")

@cached_property
def conn(self):
Expand Down

0 comments on commit 8ecd576

Please sign in to comment.
  翻译: