Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
In questa pagina viene descritto come utilizzare KubernetesPodOperator
per il deployment
Pod Kubernetes
da Cloud Composer a Google Kubernetes Engine
che fa parte del tuo ambiente Cloud Composer e per garantire
che il tuo ambiente disponga delle risorse appropriate.
KubernetesPodOperator
di avvii
Pod Kubernetes
nel cluster del tuo ambiente. In confronto,
Gli operatori di Google Kubernetes Engine eseguono i pod Kubernetes in un
che può essere un cluster separato non correlato
completamente gestito di Google Cloud. Puoi anche creare ed eliminare i cluster utilizzando gli operatori Google Kubernetes Engine.
KubernetesPodOperator
è una buona soluzione se hai bisogno di:
- Dipendenze Python personalizzate non disponibili tramite il repository pubblico PyPI.
- Dipendenze binarie che non sono disponibili nello stock Immagine worker di Cloud Composer.
Questa pagina illustra un esempio di DAG Airflow che include quanto segue
KubernetesPodOperator
configurazioni:
- Configurazione minima: imposta solo i parametri richiesti.
- Configurazione del modello: utilizza parametri che puoi modello con Jinja.
Configurazione delle variabili di secret: passa un oggetto secret Kubernetes al pod.
In Cloud Composer 3, la configurazione dell'affinità dei pod non è disponibile. Invece, utilizzano gli operatori GKE per avviare i pod in un cluster Kubernetes.
Configurazione completa: include tutte le configurazioni.
Prima di iniziare
In Cloud Composer 3, il cluster del tuo ambiente scala automaticamente. Carichi di lavoro aggiuntivi che esegui utilizzando la scalabilità di
KubernetesPodOperator
in modo indipendente dal tuo ambiente. Il tuo ambiente non è influenzato dall'aumento della richiesta di risorse, ma fa lo scale up e lo scale down del cluster del tuo ambiente a seconda della risorsa domanda. I prezzi per i carichi di lavoro aggiuntivi che esegui in che il cluster dell'ambiente segue Modello di prezzi di Cloud Composer 3 e utilizzi SKU di Cloud Composer 3.In Cloud Composer 3, il cluster del tuo ambiente si trova nel progetto tenant. Tuttavia, KubernetesPodOperator funziona allo stesso modo, senza alcuna modifica al codice richiesta rispetto a Cloud Composer 2. I pod vengono eseguiti nel cluster dell'ambiente, in uno spazio dei nomi isolato, ma con accesso alla tua rete VPC (se abilitata).
Cloud Composer 3 utilizza i cluster GKE con Federazione delle identità per i carichi di lavoro per GKE. Per impostazione predefinita, i pod che vengono eseguiti in uno spazio dei nomi appena creato o nello spazio dei nomi
composer-user-workloads
non possono accedere alle risorse Google Cloud. Quando utilizzi la federazione delle identità per i carichi di lavoro per GKE, gli account di servizio Kubernetes associati ai namespace devono essere mappati agli account di servizio Google Cloud per attivare l'autorizzazione dell'identità di servizio per le richieste alle API di Google e ad altri servizi.Per questo motivo, se esegui pod nello spazio dei nomi
composer-user-workloads
uno spazio dei nomi appena creato nel cluster dell'ambiente, Associazioni IAM tra Kubernetes e Google Cloud gli account di servizio non vengono creati e questi pod non possono accedere alle risorse del tuo progetto Google Cloud.Se vuoi che i tuoi pod abbiano accesso alle risorse Google Cloud, quindi utilizza lo spazio dei nomi
composer-user-workloads
o creane uno personalizzato come descritto più dettagliatamente.Per fornire l'accesso alle risorse del tuo progetto, segui le indicazioni in Federazione delle identità per i carichi di lavoro per GKE e imposta le associazioni:
- Crea uno spazio dei nomi separato nel cluster del tuo ambiente.
- Crea un'associazione tra
composer-user-workloads/<namespace_name>
account di servizio Kubernetes e l'account di servizio del tuo ambiente. - Aggiungi l'annotazione dell'account di servizio del tuo ambiente all'account di servizio Kubernetes.
- Quando utilizzi
KubernetesPodOperator
, specifica lo spazio dei nomi e l'account di servizio Kubernetes innamespace
e Parametriservice_account_name
.
Cloud Composer 3 utilizza i cluster GKE con il carico di lavoro Identità. Il server metadati GKE impiega alcuni secondi per iniziare ad accettare richieste su un pod appena creato. Di conseguenza, il tentativo di a eseguire l'autenticazione con Workload Identity entro i primi secondi di una Il ciclo di vita del pod potrebbe interrompersi. Per ulteriori informazioni su questa limitazione, consulta: Restrizioni di Workload Identity.
Cloud Composer 3 utilizza i cluster Autopilot che introducono la nozione di classi di computing:
Per impostazione predefinita, se non è selezionato alcun corso, il corso
general-purpose
sarà dei pod durante la creazione dei pod utilizzandoKubernetesPodOperator
.A ogni classe sono associati limiti di risorse e proprietà specifiche. Puoi trovare informazioni al riguardo nella documentazione di Autopilot. Ad esempio, i pod in esecuzione nella classe
general-purpose
possono utilizzare fino a 110 GiB di memoria.
Configurazione di KubernetesPodOperator
Per seguire questo esempio, inserisci l'intero file kubernetes_pod_operator.py
nella cartella dags/
del tuo ambiente o
aggiungi il codice KubernetesPodOperator
pertinente a un DAG.
Le sezioni seguenti spiegano ciascuna configurazione di KubernetesPodOperator
dell'esempio. Per informazioni su ogni variabile di configurazione,
consulta il riferimento di Airflow.
Configurazione minima
Per creare KubernetesPodOperator
, solo i pod name
, namespace
, dove
eseguire il pod, sono obbligatori image
per l'uso e task_id
.
Quando inserisci il seguente snippet di codice in un DAG, la configurazione utilizza
valori predefiniti in /home/airflow/composer_kube_config
. Non è necessario modificare
codice per completare l'attività pod-ex-minimum
.
Configurazione modello
Airflow supporta
Modelli Jinja.
Devi dichiarare le variabili richieste (task_id
, name
, namespace
,
e image
) con l'operatore. Come mostrato nell'esempio seguente,
modelli tutti gli altri parametri con Jinja, inclusi cmds
, arguments
,
env_vars
e config_file
.
Senza modificare il DAG o l'ambiente, la task ex-kube-templates
non riesce a causa di due errori. I log mostrano che questa attività non va a buon fine perché la variabile appropriata non esiste (my_value
). Il secondo errore, che puoi ottenere dopo aver corretto il primo, indica che l'attività non va a buon fine perché core/kube_config
non viene trovato in config
.
Per correggere entrambi gli errori, segui i passaggi descritti più avanti.
Per impostare my_value
con gcloud
o l'interfaccia utente di Airflow:
Interfaccia utente di Airflow
Nell'interfaccia utente di Airflow 2:
Vai alla UI di Airflow.
Nella barra degli strumenti, seleziona Amministrazione > Variabili.
Nella pagina Elenca variabili, fai clic su Aggiungi un nuovo record.
Nella pagina Aggiungi variabile, inserisci le seguenti informazioni:
- Chiave:
my_value
- Val:
example_value
- Chiave:
Fai clic su Salva.
gcloud
Per Airflow 2, inserisci il comando seguente:
gcloud composer environments run ENVIRONMENT \
--location LOCATION \
variables set -- \
my_value example_value
Sostituisci:
ENVIRONMENT
con il nome dell'ambiente.LOCATION
con la regione in cui si trova l'ambiente.
Per fare riferimento a un config_file
personalizzato (un file di configurazione di Kubernetes),
sostituisci l'opzione di configurazione kube_config
Airflow in modo che
configurazione Kubernetes valida:
Sezione | Chiave | Valore |
---|---|---|
core |
kube_config |
/home/airflow/composer_kube_config |
Attendi qualche minuto per l'aggiornamento dell'ambiente. Quindi,
esegui di nuovo l'attività ex-kube-templates
e verifica che sia riuscita.
Configurazione completa
Questo esempio mostra tutte le variabili che puoi configurare nel
KubernetesPodOperator
. Non è necessario modificare il codice per
l'attività ex-all-configs
per completare l'operazione.
Per maggiori dettagli su ogni variabile, consulta
Riferimento KubernetesPodOperator
Airflow.
Informazioni sul provider Kubernetes CNCF
GKEStartPodOperator e KubernetesPodOperator sono implementati
Provider apache-airflow-providers-cncf-kubernetes
.
Per le note di rilascio non superate per il provider Kubernetes CNCF, fai riferimento al sito web del provider Kubernetes CNCF.
Versione 6.0.0
Nella versione 6.0.0 del pacchetto Provider Kubernetes CNCF,
la connessione kubernetes_default
viene utilizzata per impostazione predefinita
KubernetesPodOperator
.
Se hai specificato una connessione personalizzata nella versione 5.0.0,
è ancora utilizzato dall'operatore. Per tornare a utilizzare kubernetes_default
potresti voler regolare i DAG di conseguenza.
Versione 5.0.0
Questa versione introduce alcune modifiche non compatibili con le versioni precedenti
rispetto alla versione 4.4.0. Le più importanti riguardano la connessione kubernetes_default
, che non viene utilizzata nella versione 5.0.0.
- La connessione
kubernetes_default
deve essere modificata. Percorso configurazione kube deve essere impostato su/home/airflow/composer_kube_config
(come mostrato nella Figura 1) In alternativa, è necessario aggiungereconfig_file
alla configurazioneKubernetesPodOperator
(come mostrato nel codice seguente esempio).
- Modifica il codice di un'attività utilizzando KubernetesPodOperator nel seguente modo:
KubernetesPodOperator(
# config_file parameter - can be skipped if connection contains this setting
config_file="/home/airflow/composer_kube_config",
# definition of connection to be used by the operator
kubernetes_conn_id='kubernetes_default',
...
)
Per ulteriori informazioni sulla versione 5.0.0, consulta le note di rilascio del provider Kubernetes CNC.
Risoluzione dei problemi
Suggerimenti per la risoluzione degli errori dei pod
Oltre a controllare i log delle attività nella UI di Airflow, controlla anche i seguenti log:
Output dello scheduler e dei worker di Airflow:
Nella console Google Cloud, vai alla pagina Ambienti.
Segui il link DAG per il tuo ambiente.
Nel bucket del tuo ambiente, sali di un livello.
Esamina i log nel
logs/<DAG_NAME>/<TASK_ID>/<EXECUTION_DATE>
.
Log dettagliati dei pod nella console Google Cloud per i carichi di lavoro GKE. Questi log includono il file YAML di definizione del pod, gli eventi del pod e i dettagli del pod.
Codici di reso diversi da zero se utilizzi anche GKEStartPodOperator
Quando utilizzi KubernetesPodOperator
e GKEStartPodOperator
, il codice di ritorno del punto di contatto del contenitore determina se l'attività è considerata riuscita o meno. I codici di reso diversi da zero indicano un errore.
Un pattern comune quando si utilizzano KubernetesPodOperator
e
GKEStartPodOperator
deve eseguire uno script shell come container
punto di ingresso per raggruppare più operazioni all'interno del container.
Se stai scrivendo uno script di questo tipo, ti consigliamo di includere
Il comando set -e
nella parte superiore dello script
in modo che i comandi con errori nello script terminino lo script
di propagare l'errore all'istanza dell'attività Airflow.
Timeout pod
Il timeout predefinito per KubernetesPodOperator
è 120 secondi, che
può causare dei timeout prima del download di immagini di dimensioni maggiori. Puoi
aumenta il timeout modificando startup_timeout_seconds
durante la creazione del parametro KubernetesPodOperator
.
Quando si verifica il timeout di un pod, il log specifico dell'attività è disponibile in la UI di Airflow. Ad esempio:
Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 27, in <module>
args.func(args)
File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
pool=args.pool,
File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start
I timeout dei pod possono verificarsi anche Account di servizio Cloud Composer non abbia le autorizzazioni IAM necessarie per eseguire l'attività mano. Per verificarlo, esamina gli errori a livello di pod utilizzando Dashboard di GKE per esaminare i log per particolare carico di lavoro o usare Cloud Logging.
Impossibile stabilire una nuova connessione
L'upgrade automatico è abilitato per impostazione predefinita nei cluster GKE. Se un pool di nodi si trova in un cluster in fase di upgrade, potresti vedere quanto segue errore:
<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused
Per verificare se è in corso l'upgrade del cluster, nella console Google Cloud vai alla pagina Cluster Kubernetes e cerca l'icona di caricamento accanto al nome del cluster dell'ambiente.