Skip to content

Commit

Permalink
Support impersonation_chain parameter in the GKEStartPodOperator (#19518
Browse files Browse the repository at this point in the history
)
  • Loading branch information
Łukasz Wyszomirski committed Nov 18, 2021
1 parent 2976070 commit 952ef90
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 1 deletion.
27 changes: 27 additions & 0 deletions airflow/providers/google/cloud/operators/kubernetes_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,15 @@ class GKEStartPodOperator(KubernetesPodOperator):
:param gcp_conn_id: The google cloud connection id to use. This allows for
users to specify a service account.
:type gcp_conn_id: str
:param impersonation_chain: Optional service account to impersonate using short-term
credentials, or list of accounts required to get the access_token
of the last account in the list, which will be impersonated in the request.
If set as a string, the account must grant the originating account
the Service Account Token Creator IAM role.
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account (templated).
:type impersonation_chain: Union[str, Sequence[str]]
"""

template_fields = {'project_id', 'location', 'cluster_name'} | set(KubernetesPodOperator.template_fields)
Expand All @@ -297,6 +306,7 @@ def __init__(
use_internal_ip: bool = False,
project_id: Optional[str] = None,
gcp_conn_id: str = 'google_cloud_default',
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand All @@ -305,6 +315,7 @@ def __init__(
self.cluster_name = cluster_name
self.gcp_conn_id = gcp_conn_id
self.use_internal_ip = use_internal_ip
self.impersonation_chain = impersonation_chain

if self.gcp_conn_id is None:
raise AirflowException(
Expand Down Expand Up @@ -350,6 +361,22 @@ def execute(self, context) -> Optional[str]:
"--project",
self.project_id,
]
if self.impersonation_chain:
if isinstance(self.impersonation_chain, str):
impersonation_account = self.impersonation_chain
elif len(self.impersonation_chain) == 1:
impersonation_account = self.impersonation_chain[:-1]
else:
raise AirflowException(
"Chained list of accounts is not supported, please specify only one service account"
)

cmd.extend(
[
'--impersonate-service-account',
impersonation_account,
]
)
if self.use_internal_ip:
cmd.append('--internal-ip')
execute_in_subprocess(cmd)
Expand Down
1 change: 0 additions & 1 deletion docs/apache-airflow-providers-google/connections/gcp.rst
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ access token, which will allow to act on its behalf using its permissions. ``imp
does not even need to have a generated key.

.. warning::
:class:`~airflow.providers.google.cloud.operators.kubernetes_engine.GKEStartPodOperator`,
:class:`~airflow.providers.google.cloud.operators.dataflow.DataflowCreateJavaJobOperator` and
:class:`~airflow.providers.google.cloud.operators.dataflow.DataflowCreatePythonJobOperator`
do not support direct impersonation as of now.
Expand Down
44 changes: 44 additions & 0 deletions tests/providers/google/cloud/operators/test_kubernetes_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,3 +291,47 @@ def test_execute_with_internal_ip(
)

assert self.gke_op.config_file == FILE_NAME

@mock.patch.dict(os.environ, {})
@mock.patch(
"airflow.hooks.base.BaseHook.get_connections",
return_value=[
Connection(
extra=json.dumps(
{"extra__google_cloud_platform__keyfile_dict": '{"private_key": "r4nd0m_k3y"}'}
)
)
],
)
@mock.patch('airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator.execute')
@mock.patch('airflow.providers.google.cloud.operators.kubernetes_engine.GoogleBaseHook')
@mock.patch('airflow.providers.google.cloud.operators.kubernetes_engine.execute_in_subprocess')
@mock.patch('tempfile.NamedTemporaryFile')
def test_execute_with_impersonation_service_account(
self, file_mock, mock_execute_in_subprocess, mock_gcp_hook, exec_mock, get_con_mock
):
type(file_mock.return_value.__enter__.return_value).name = PropertyMock(
side_effect=[FILE_NAME, '/path/to/new-file']
)
self.gke_op.impersonation_service_account = "test_account@example.com"
self.gke_op.execute(None)

mock_gcp_hook.return_value.provide_authorized_gcloud.assert_called_once()

mock_execute_in_subprocess.assert_called_once_with(
[
'gcloud',
'container',
'clusters',
'get-credentials',
CLUSTER_NAME,
'--zone',
PROJECT_LOCATION,
'--project',
TEST_GCP_PROJECT_ID,
'--impersonate-service-account',
'test_account@example.com',
]
)

assert self.gke_op.config_file == FILE_NAME

0 comments on commit 952ef90

Please sign in to comment.
  翻译: