Skip to content

Commit

Permalink
Add optional location to bigquery data transfer service (#15088) (#20221
Browse files Browse the repository at this point in the history
)
  • Loading branch information
Bradley Bonitatibus authored Dec 12, 2021
1 parent bc76126 commit 98514cc
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 6 deletions.
23 changes: 20 additions & 3 deletions airflow/providers/google/cloud/hooks/bigquery_dts.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,15 @@ def __init__(
self,
gcp_conn_id: str = "google_cloud_default",
delegate_to: Optional[str] = None,
location: Optional[str] = None,
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
) -> None:
super().__init__(
gcp_conn_id=gcp_conn_id,
delegate_to=delegate_to,
impersonation_chain=impersonation_chain,
)
self.location = location

@staticmethod
def _disable_auto_scheduling(config: Union[dict, TransferConfig]) -> TransferConfig:
Expand Down Expand Up @@ -133,6 +135,9 @@ def create_transfer_config(
"""
client = self.get_conn()
parent = f"projects/{project_id}"
if self.location:
parent = f"{parent}/locations/{self.location}"

return client.create_transfer_config(
request={
'parent': parent,
Expand Down Expand Up @@ -174,7 +179,11 @@ def delete_transfer_config(
:return: None
"""
client = self.get_conn()
name = f"projects/{project_id}/transferConfigs/{transfer_config_id}"
project = f"projects/{project_id}"
if self.location:
project = f"/{project}/locations/{self.location}"

name = f"{project}/transferConfigs/{transfer_config_id}"
return client.delete_transfer_config(
request={'name': name}, retry=retry, timeout=timeout, metadata=metadata or ()
)
Expand Down Expand Up @@ -223,7 +232,11 @@ def start_manual_transfer_runs(
:return: An ``google.cloud.bigquery_datatransfer_v1.types.StartManualTransferRunsResponse`` instance.
"""
client = self.get_conn()
parent = f"projects/{project_id}/transferConfigs/{transfer_config_id}"
project = f"projects/{project_id}"
if self.location:
project = f"{project}/locations/{self.location}"

parent = f"{project}/transferConfigs/{transfer_config_id}"
return client.start_manual_transfer_runs(
request={
'parent': parent,
Expand Down Expand Up @@ -268,7 +281,11 @@ def get_transfer_run(
:return: An ``google.cloud.bigquery_datatransfer_v1.types.TransferRun`` instance.
"""
client = self.get_conn()
name = f"projects/{project_id}/transferConfigs/{transfer_config_id}/runs/{run_id}"
project = f"projects/{project_id}"
if self.location:
project = f"{project}/locations/{self.location}"

name = "f{project}/transferConfigs/{transfer_config_id}/runs/{run_id}"
return client.get_transfer_run(
request={'name': name}, retry=retry, timeout=timeout, metadata=metadata or ()
)
18 changes: 15 additions & 3 deletions airflow/providers/google/cloud/operators/bigquery_dts.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class BigQueryCreateDataTransferOperator(BaseOperator):
created. If set to None or missing, the default project_id from the Google Cloud connection
is used.
:type project_id: str
:param: location: BigQuery Transfer Service location for regional transfers.
:type location: Optional[str]
:param authorization_code: authorization code to use with this transfer configuration.
This is required if new credentials are needed.
:type authorization_code: Optional[str]
Expand Down Expand Up @@ -77,6 +79,7 @@ def __init__(
*,
transfer_config: dict,
project_id: Optional[str] = None,
location: Optional[str] = None,
authorization_code: Optional[str] = None,
retry: Retry = None,
timeout: Optional[float] = None,
Expand All @@ -89,6 +92,7 @@ def __init__(
self.transfer_config = transfer_config
self.authorization_code = authorization_code
self.project_id = project_id
self.location = location
self.retry = retry
self.timeout = timeout
self.metadata = metadata
Expand All @@ -97,7 +101,7 @@ def __init__(

def execute(self, context):
hook = BiqQueryDataTransferServiceHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain, location=self.location
)
self.log.info("Creating DTS transfer config")
response = hook.create_transfer_config(
Expand Down Expand Up @@ -127,6 +131,8 @@ class BigQueryDeleteDataTransferConfigOperator(BaseOperator):
:param project_id: The BigQuery project id where the transfer configuration should be
created. If set to None or missing, the default project_id from the Google Cloud connection is used.
:type project_id: str
:param: location: BigQuery Transfer Service location for regional transfers.
:type location: Optional[str]
:param retry: A retry object used to retry requests. If `None` is
specified, requests will not be retried.
:type retry: Optional[google.api_core.retry.Retry]
Expand Down Expand Up @@ -161,6 +167,7 @@ def __init__(
*,
transfer_config_id: str,
project_id: Optional[str] = None,
location: Optional[str] = None,
retry: Retry = None,
timeout: Optional[float] = None,
metadata: Optional[Sequence[Tuple[str, str]]] = None,
Expand All @@ -170,6 +177,7 @@ def __init__(
) -> None:
super().__init__(**kwargs)
self.project_id = project_id
self.location = location
self.transfer_config_id = transfer_config_id
self.retry = retry
self.timeout = timeout
Expand All @@ -179,7 +187,7 @@ def __init__(

def execute(self, context) -> None:
hook = BiqQueryDataTransferServiceHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain, location=self.location
)
hook.delete_transfer_config(
transfer_config_id=self.transfer_config_id,
Expand Down Expand Up @@ -215,6 +223,8 @@ class BigQueryDataTransferServiceStartTransferRunsOperator(BaseOperator):
:param project_id: The BigQuery project id where the transfer configuration should be
created. If set to None or missing, the default project_id from the Google Cloud connection is used.
:type project_id: str
:param: location: BigQuery Transfer Service location for regional transfers.
:type location: Optional[str]
:param retry: A retry object used to retry requests. If `None` is
specified, requests will not be retried.
:type retry: Optional[google.api_core.retry.Retry]
Expand Down Expand Up @@ -251,6 +261,7 @@ def __init__(
*,
transfer_config_id: str,
project_id: Optional[str] = None,
location: Optional[str] = None,
requested_time_range: Optional[dict] = None,
requested_run_time: Optional[dict] = None,
retry: Retry = None,
Expand All @@ -262,6 +273,7 @@ def __init__(
) -> None:
super().__init__(**kwargs)
self.project_id = project_id
self.location = location
self.transfer_config_id = transfer_config_id
self.requested_time_range = requested_time_range
self.requested_run_time = requested_run_time
Expand All @@ -273,7 +285,7 @@ def __init__(

def execute(self, context):
hook = BiqQueryDataTransferServiceHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain, location=self.location
)
self.log.info('Submitting manual transfer for %s', self.transfer_config_id)
response = hook.start_manual_transfer_runs(
Expand Down

0 comments on commit 98514cc

Please sign in to comment.
  翻译: