Skip to content

Commit

Permalink
Allow omission of initial_node_count if node_pools is specified (#…
Browse files Browse the repository at this point in the history
…17820)

* Add error check for config_file parameter

* Move error check to init

* fix static checks

* Apply suggestions from code review

* Apply suggestions from code review

* Modify validation to allow node_pools

* WIP: pass test cases 0, 1, 3

* WIP: pass test case 4

* WIP: pass test case 2

* WIP - pass test cases 5 and 6, remove trailing comma

* WIP - pass test case 7

* WIP pass test case 8

* cleanup

* fix static check

Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>
  • Loading branch information
leahecole and kaxil authored Aug 27, 2021
1 parent 83f1f07 commit 87769db
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 7 deletions.
28 changes: 24 additions & 4 deletions airflow/providers/google/cloud/operators/kubernetes_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,15 +210,35 @@ def __init__(
self._check_input()

def _check_input(self) -> None:
if not all([self.project_id, self.location, self.body]) or not (
(isinstance(self.body, dict) and "name" in self.body and "initial_node_count" in self.body)
or (getattr(self.body, "name", None) and getattr(self.body, "initial_node_count", None))
if (
not all([self.project_id, self.location, self.body])
or (isinstance(self.body, dict) and not ("name" in self.body))
or (
isinstance(self.body, dict)
and ("initial_node_count" not in self.body and "node_pools" not in self.body)
)
or (not (isinstance(self.body, dict)) and not (getattr(self.body, "name", None)))
or (
not (isinstance(self.body, dict))
and (
not (getattr(self.body, "initial_node_count", None))
and not (getattr(self.body, "node_pools", None))
)
)
):
self.log.error(
"One of (project_id, location, body, body['name'], "
"body['initial_node_count']) is missing or incorrect"
"body['initial_node_count']), body['node_pools'] is missing or incorrect"
)
raise AirflowException("Operator has incorrect or missing input.")
elif (
isinstance(self.body, dict) and ("initial_node_count" in self.body and "node_pools" in self.body)
) or (
not (isinstance(self.body, dict))
and (getattr(self.body, "initial_node_count", None) and getattr(self.body, "node_pools", None))
):
self.log.error("Only one of body['initial_node_count']) and body['node_pools'] may be specified")
raise AirflowException("Operator has incorrect or missing input.")

def execute(self, context) -> str:
hook = GKEHook(
Expand Down
59 changes: 56 additions & 3 deletions tests/providers/google/cloud/operators/test_kubernetes_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,17 @@

PROJECT_BODY = {'name': 'test-name'}
PROJECT_BODY_CREATE_DICT = {'name': 'test-name', 'initial_node_count': 1}
PROJECT_BODY_CREATE_DICT_NODE_POOLS = {
'name': 'test-name',
'node_pools': [{'name': 'a_node_pool', 'initial_node_count': 1}],
}

PROJECT_BODY_CREATE_CLUSTER = type("Cluster", (object,), {"name": "test-name", "initial_node_count": 1})()
PROJECT_BODY_CREATE_CLUSTER_NODE_POOLS = type(
'Cluster',
(object,),
{'name': 'test-name', 'node_pools': [{'name': 'a_node_pool', 'initial_node_count': 1}]},
)()

TASK_NAME = 'test-task-name'
NAMESPACE = ('default',)
Expand All @@ -52,7 +62,15 @@


class TestGoogleCloudPlatformContainerOperator(unittest.TestCase):
@parameterized.expand((body,) for body in [PROJECT_BODY_CREATE_DICT, PROJECT_BODY_CREATE_CLUSTER])
@parameterized.expand(
(body,)
for body in [
PROJECT_BODY_CREATE_DICT,
PROJECT_BODY_CREATE_DICT_NODE_POOLS,
PROJECT_BODY_CREATE_CLUSTER,
PROJECT_BODY_CREATE_CLUSTER_NODE_POOLS,
]
)
@mock.patch('airflow.providers.google.cloud.operators.kubernetes_engine.GKEHook')
def test_create_execute(self, body, mock_hook):
operator = GKECreateClusterOperator(
Expand All @@ -69,9 +87,44 @@ def test_create_execute(self, body, mock_hook):
for body in [
None,
{'missing_name': 'test-name', 'initial_node_count': 1},
{'name': 'test-name', 'missing_initial_node_count': 1},
{
'name': 'test-name',
'initial_node_count': 1,
'node_pools': [{'name': 'a_node_pool', 'initial_node_count': 1}],
},
{'missing_name': 'test-name', 'node_pools': [{'name': 'a_node_pool', 'initial_node_count': 1}]},
{
'name': 'test-name',
'missing_initial_node_count': 1,
'missing_node_pools': [{'name': 'a_node_pool', 'initial_node_count': 1}],
},
type('Cluster', (object,), {'missing_name': 'test-name', 'initial_node_count': 1})(),
type('Cluster', (object,), {'name': 'test-name', 'missing_initial_node_count': 1})(),
type(
'Cluster',
(object,),
{
'missing_name': 'test-name',
'node_pools': [{'name': 'a_node_pool', 'initial_node_count': 1}],
},
)(),
type(
'Cluster',
(object,),
{
'name': 'test-name',
'missing_initial_node_count': 1,
'missing_node_pools': [{'name': 'a_node_pool', 'initial_node_count': 1}],
},
)(),
type(
'Cluster',
(object,),
{
'name': 'test-name',
'initial_node_count': 1,
'node_pools': [{'name': 'a_node_pool', 'initial_node_count': 1}],
},
)(),
]
)
@mock.patch('airflow.providers.google.cloud.operators.kubernetes_engine.GKEHook')
Expand Down

0 comments on commit 87769db

Please sign in to comment.
  翻译: