Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Auf dieser Seite wird beschrieben, wie Sie die KubernetesPodOperator
zum Bereitstellen verwenden
Kubernetes-Pods
von Cloud Composer in die Google Kubernetes Engine
der Teil Ihrer Cloud Composer-Umgebung ist, und stellen Sie sicher,
ob Ihre Umgebung über die
entsprechenden Ressourcen verfügt.
KubernetesPodOperator
Markteinführungen
Kubernetes-Pods
im Cluster Ihrer Umgebung. Im Vergleich dazu
Mit Google Kubernetes Engine-Operatoren werden Kubernetes-Pods in einem bestimmten
Cluster, der ein separater Cluster sein kann, der nicht mit Ihrem
zu verbessern. Sie können Cluster auch mit
Google Kubernetes Engine-Operatoren.
KubernetesPodOperator
ist eine gute Option, wenn Sie Folgendes benötigen:
- Benutzerdefinierte Python-Abhängigkeiten, die nicht über das öffentliche PyPI-Repository verfügbar sind.
- Binäre Abhängigkeiten, die im Cloud Composer-Worker-Image nicht verfügbar sind.
Auf dieser Seite wird ein Beispiel für einen Airflow-DAG beschrieben, der Folgendes enthält:
KubernetesPodOperator
-Konfigurationen:
- Minimalkonfiguration: Legt nur die erforderlichen Parameter fest.
- Vorlagenkonfiguration: Verwendet Parameter, die Sie für Vorlagen mit Jinja verwenden können.
Secret-Variablenkonfiguration: Übergibt ein Kubernetes-Secret-Objekt an den Pod.
Pod-Affinitätskonfiguration: Begrenzt die verfügbaren Knoten, für die Pods geplant werden können.
Vollständige Konfiguration: Umfasst alle Konfigurationen.
Hinweis
- Wir empfehlen die Verwendung der neuesten Version von Cloud Composer. Diese Version muss mindestens im Rahmen der Einstellungs- und Supportrichtlinie unterstützt werden.
- Es müssen genügend Ressourcen für die Umgebung verfügbar sein. Das Starten von Pods in einer Umgebung mit zu wenigen Ressourcen kann zu Airflow-Worker- und Airflow-Planer-Fehler führen.
Cloud Composer-Umgebungsressourcen einrichten
Wenn Sie eine Cloud Composer-Umgebung erstellen, geben Sie deren Leistungsparameter an, einschließlich der Leistungsparameter für den Cluster der Umgebung. Das Starten von Kubernetes-Pods im Umgebungscluster kann zu Konflikten bei Clusterressourcen wie CPU oder Arbeitsspeicher führen. Da sich der Airflow-Planer und die Airflow-Worker im selben GKE-Cluster befinden, funktionieren Planer und Worker nicht ordnungsgemäß, wenn diese Konkurrenzsituation einen Ressourcenmangel verursacht.
Führen Sie eine oder mehrere der folgenden Aktionen aus, um einen Ressourcenmangel zu verhindern:
- (Empfohlen) Knotenpool erstellen
- Anzahl der Knoten in der Umgebung erhöhen
- Entsprechenden Maschinentyp angeben
Knotenpool erstellen
Die bevorzugte Methode zum Verhindern eines Ressourcenmangels in der Cloud Composer-Umgebung besteht darin, einen neuen Knotenpool zu erstellen und Kubernetes-Pods so zu konfigurieren, dass sie bei der Ausführung nur Ressourcen aus diesem Pool verwenden.
Console
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Klicken Sie auf den Namen Ihrer Umgebung.
Wechseln Sie auf der Seite Umgebungsdetails zum Tab Umgebungskonfiguration.
Gehen Sie zu Ressourcen > GKE-Cluster. klicken Sie auf den Link Clusterdetails anzeigen.
Erstellen Sie einen Knotenpool, wie unter Knotenpool hinzufügen beschrieben.
gcloud
Bestimmen Sie den Namen des Umgebungsclusters:
gcloud composer environments describe ENVIRONMENT_NAME \ --location LOCATION \ --format="value(config.gkeCluster)"
Ersetzen Sie:
ENVIRONMENT_NAME
durch den Namen der Umgebung.LOCATION
durch die Region, in der sich die Umgebung befindet.
Die Ausgabe enthält den Namen des Clusters Ihrer Umgebung. Dies kann beispielsweise
europe-west3-example-enviro-af810e25-gke
sein.Erstellen Sie einen Knotenpool, wie unter Knotenpool hinzufügen beschrieben.
Anzahl der Knoten in Ihrer Umgebung erhöhen
Wenn Sie die Anzahl der Knoten in der Cloud Composer-Umgebung erhöhen, steht den Arbeitslasten mehr Rechenleistung zur Verfügung. Durch diese Erhöhung werden keine zusätzlichen Ressourcen für Aufgaben bereitgestellt, die mehr CPU oder RAM benötigen, als der angegebene Maschinentyp bietet.
Zum Erhöhen der Knotenanzahl aktualisieren Sie die Umgebung.
Geben Sie den entsprechenden Maschinentyp an.
Wenn Sie die Cloud Composer-Umgebung erstellen, können Sie einen Maschinentyp angeben. Damit immer genug Ressourcen verfügbar sind, sollten Sie einen Maschinentyp für die Art von Computing angeben, das in Ihrer Cloud Composer-Umgebung ausgeführt wird.
KubernetesPodOperator-Konfiguration
Zum Ausführen dieses Beispiels legen Sie die gesamte kubernetes_pod_operator.py
-Datei in den dags/
-Ordner Ihrer Umgebung. Alternativ können Sie den entsprechenden KubernetesPodOperator
-Code einem DAG hinzufügen.
In den folgenden Abschnitten wird jede KubernetesPodOperator
-Konfiguration im Beispiel erläutert. Informationen zu den einzelnen Konfigurationsvariablen finden Sie in der Airflow-Referenz.
Airflow 2
Airflow 1
Minimalkonfiguration
Zum Erstellen eines KubernetesPodOperator
muss nur name
, namespace
des Pods,
den Pod ausführen. image
ist erforderlich und task_id
sind erforderlich.
Wenn Sie das folgende Code-Snippet in eine DAG einfügen, verwendet die Konfiguration die Standardwerte in /home/airflow/composer_kube_config
. Sie müssen den Code nicht ändern, damit die Aufgabe pod-ex-minimum
erfolgreich ausgeführt wird.
Airflow 2
Airflow 1
Vorlagenkonfiguration
Airflow unterstützt die Verwendung von Jinja-Vorlagen.
Sie müssen die erforderlichen Variablen (task_id
, name
, namespace
, image
) mit dem Operator deklarieren. Wie im folgenden Beispiel gezeigt, können Sie alle anderen Parameter mit Jinja als Vorlage verwenden, einschließlich cmds
, arguments
, env_vars
und config_file
.
Airflow 2
Airflow 1
Wenn der DAG oder die Umgebung nicht geändert wird, schlägt die Aufgabe ex-kube-templates
aufgrund von zwei Fehlern fehl. Die Logs zeigen, dass diese Aufgabe fehlschlägt, weil die entsprechende Variable (my_value
) fehlt. Der zweite Fehler, den Sie nach dem Beheben des ersten Fehlers erhalten können, zeigt, dass die Aufgabe fehlschlägt, weil core/kube_config
nicht in config
gefunden wird.
Gehen Sie wie unten beschrieben vor, um beide Fehler zu beheben.
So legen Sie my_value
mit gcloud
oder der Airflow-UI fest:
Airflow-UI
In der Airflow 2-UI:
Rufen Sie die Airflow-UI auf.
Klicken Sie in der Symbolleiste auf Admin > Variables.
Klicken Sie auf der Seite Listenvariable auf Neuen Eintrag hinzufügen.
Geben Sie auf der Seite Add Variable (Variable hinzufügen) die folgenden Informationen ein:
- Key:
my_value
- Val:
example_value
- Key:
Klicken Sie auf Speichern.
In der Airflow 1-UI:
Rufen Sie die Airflow-UI auf.
Klicken Sie in der Symbolleiste auf Admin > Variables.
Klicken Sie auf der Seite Variablen auf den Tab Erstellen.
Geben Sie auf der Seite Variable die folgenden Informationen ein:
- Key:
my_value
- Val:
example_value
- Key:
Klicken Sie auf Speichern.
gcloud
Geben Sie für Airflow 2 den folgenden Befehl ein:
gcloud composer environments run ENVIRONMENT \
--location LOCATION \
variables set -- \
my_value example_value
Geben Sie für Airflow 1 den folgenden Befehl ein:
gcloud composer environments run ENVIRONMENT \
--location LOCATION \
variables -- \
--set my_value example_value
Ersetzen Sie:
ENVIRONMENT
durch den Namen der Umgebung.LOCATION
durch die Region, in der sich die Umgebung befindet.
Wenn Sie auf eine benutzerdefinierte config_file
(Kubernetes-Konfigurationsdatei) verweisen möchten, überschreiben Sie die Airflow-Konfigurationsoption kube_config
durch eine gültige Kubernetes-Konfiguration:
Bereich | Schlüssel | Wert |
---|---|---|
core |
kube_config |
/home/airflow/composer_kube_config |
Warten Sie einige Minuten, bis die Umgebung aktualisiert wurde. Führen Sie dann die Aufgabe ex-kube-templates
noch einmal aus und prüfen Sie, ob die Aufgabe ex-kube-templates
erfolgreich ausgeführt wurde.
Konfiguration von Secret-Variablen
Ein Kubernetes-Secret
ist ein Objekt, das sensible Daten enthält. Mit KubernetesPodOperator
können Sie Secrets an die Kubernetes-Pods übergeben.
Secrets müssen in Kubernetes definiert werden. Ansonsten kann der Pod nicht gestartet werden.
Dieses Beispiel zeigt zwei Möglichkeiten für die Verwendung von Kubernetes Secrets: als Umgebungsvariable und als vom Pod bereitgestelltes Volume.
Das erste Secret, airflow-secrets
, ist auf eine Kubernetes-Umgebungsvariable namens SQL_CONN
festgelegt (nicht auf eine Airflow- oder Cloud Composer-Umgebungsvariable).
Das zweite Secret, service-account
, stellt service-account.json
, eine Datei mit einem Dienstkontotoken, in /var/secrets/google
bereit.
Die Secrets sehen so aus:
Airflow 2
Airflow 1
Der Name des ersten Kubernetes-Secrets wird in der Variablen secret
definiert.
Der Name dieses speziellen Secrets lautet airflow-secrets
. Es wird als Umgebungsvariable gemäß der Vorgabe von deploy_type
bereitgestellt. Die Umgebungsvariable, für die es festgelegt wird (deploy_target
), lautet SQL_CONN
. Schließlich lautet der key
des Secrets, das in deploy_target
gespeichert ist, sql_alchemy_conn
.
Der Name des zweiten Kubernetes-Secrets wird in der Variablen secret
definiert.
Der Name dieses speziellen Secrets lautet service-account
. Es wird als Volume bereitgestellt, wie von deploy_type
festgelegt. Der Pfad der bereitzustellenden Datei (deploy_target
) lautet /var/secrets/google
. Schließlich lautet der key
des Secrets, das in deploy_target
gespeichert ist, service-account.json
.
Die Operatorkonfiguration sieht so aus:
Airflow 2
Airflow 1
Wenn Sie keine Änderungen am DAG oder der Umgebung vornehmen, schlägt die Aufgabe ex-kube-secrets
fehl. Aus den Logs geht hervor, dass die Aufgabe aufgrund des Fehlers Pod took too long to start
fehlschlägt. Dieser Fehler tritt auf, weil Airflow das in der Konfiguration angegebene Secret secret_env
nicht finden kann.
gcloud
So legen Sie das Secret mit gcloud
fest:
Informationen über Ihren Cloud Composer-Umgebungscluster abrufen
Führen Sie dazu diesen Befehl aus:
gcloud composer environments describe ENVIRONMENT \ --location LOCATION \ --format="value(config.gkeCluster)"
Ersetzen Sie:
ENVIRONMENT
durch den Namen Ihrer Umgebung.LOCATION
durch die Region, in der sich die Cloud Composer-Umgebung befindet.
Die Ausgabe dieses Befehls hat das folgende Format:
projects/<your-project-id>/zones/<zone-of-composer-env>/clusters/<your-cluster-id>
.Zum Abrufen der GKE-Cluster-ID kopieren Sie die Ausgabe nach
/clusters/
(endet auf-gke
).Zum Abrufen der Zone kopieren Sie die Ausgabe nach
/zones/
.
Stellen Sie mit folgendem Befehl eine Verbindung zum GKE-Cluster her:
gcloud container clusters get-credentials CLUSTER_ID \ --project PROJECT \ --zone ZONE
Ersetzen Sie:
CLUSTER_ID
durch Ihre GKE-Cluster-ID.- Ersetzen Sie
PROJECT
durch die ID Ihres Google Cloud-Projekts. ZONE
ist die Zone, in der sich Ihr GKE befindet.
Kubernetes-Secrets erstellen.
Erstellen Sie mit dem folgenden Befehl ein Kubernetes-Secret, das den Wert von
sql_alchemy_conn
auftest_value
festlegt:kubectl create secret generic airflow-secrets \ --from-literal sql_alchemy_conn=test_value
Erstellen Sie mit folgendem Befehl ein Kubernetes-Secret, das den Wert von
service-account.json
auf einen lokalen Pfad einer Dienstkontoschlüsseldatei namenskey.json
festlegt:kubectl create secret generic service-account \ --from-file service-account.json=./key.json
Nachdem Sie die Secrets festgelegt haben, führen Sie die Aufgabe
ex-kube-secrets
noch einmal in der Airflow-UI aus.Prüfen Sie, ob die Aufgabe
ex-kube-secrets
erfolgreich ist.
Pod-Affinitätskonfiguration
Mit der Konfiguration des Parameters affinity
in KubernetesPodOperator
können Sie steuern, auf welchen Knoten Pods geplant werden sollen, beispielsweise nur auf Knoten in einem bestimmten Knotenpool. In diesem Beispiel wird der Operator nur in Knotenpools namens pool-0
und pool-1
ausgeführt. Da sich Ihre Cloud Composer 1-Umgebungsknoten im default-pool
befinden, werden Ihre Pods nicht auf den Knoten in Ihrer Umgebung ausgeführt.
Airflow 2
Airflow 1
Da das Beispiel derzeit konfiguriert ist, schlägt die Aufgabe fehl. Aus den Logs geht hervor, dass die Aufgabe fehlschlägt, weil die Knotenpools pool-0
und pool-1
nicht vorhanden sind.
Damit die Knotenpools in values
zu finden sind, nehmen Sie eine der folgenden Konfigurationsänderungen vor:
Wenn Sie einen Knotenpool zuvor erstellt haben, ersetzen Sie
pool-0
undpool-1
durch die Namen Ihrer Knotenpools. Laden Sie dann Ihren DAG noch einmal hoch.Erstellen Sie einen Knotenpool namens
pool-0
oderpool-1
. Sie können beide erstellen, zur erfolgreichen Ausführung der Aufgabe ist aber nur einer erforderlich.Ersetzen Sie
pool-0
undpool-1
durchdefault-pool
. Dies ist der von Airflow verwendete Standardpool. Laden Sie dann Ihren DAG noch einmal hoch.
Warten Sie nach den Änderungen einige Minuten, bis die Umgebung aktualisiert wurde.
Führen Sie dann die Aufgabe ex-pod-affinity
noch einmal aus und prüfen Sie, ob die Aufgabe ex-pod-affinity
erfolgreich ausgeführt wurde.
Vollständige Konfiguration
In diesem Beispiel werden alle Variablen gezeigt, die Sie in KubernetesPodOperator
konfigurieren können. Sie müssen den Code nicht ändern, damit die Aufgabe ex-all-configs
erfolgreich ist.
Weitere Informationen zu den einzelnen Variablen finden Sie in der KubernetesPodOperator
-Referenz von Airflow.
Airflow 2
Airflow 1
Informationen zum CNCF-Kubernetes-Anbieter
GKEStartPodOperator und KubernetesPodOperator sind in
apache-airflow-providers-cncf-kubernetes
-Anbieter.
Fehlerhafte Versionshinweise für den CNCF-Kubernetes-Anbieter finden Sie auf der Website des CNCF-Kubernetes-Anbieters.
Version 6.0.0
In Version 6.0.0 des CNCF-Kubernetes-Anbieterpakets
Die Verbindung kubernetes_default
wird standardmäßig in
KubernetesPodOperator
.
Wenn Sie in Version 5.0.0 eine benutzerdefinierte Verbindung angegeben haben,
weiterhin vom Operator verwendet wird. So wechselst du zurück zur kubernetes_default
Verbindung herstellen, möchten Sie Ihre DAGs möglicherweise entsprechend anpassen.
Version 5.0.0
Mit dieser Version werden einige abwärtsinkompatible Änderungen eingeführt.
im Vergleich zu Version 4.4.0. Die wichtigsten beziehen sich auf
Die kubernetes_default
-Verbindung, die in Version 5.0.0 nicht verwendet wird.
- Die
kubernetes_default
-Verbindung muss geändert werden. Kube-Konfigurationspfad muss auf/home/airflow/composer_kube_config
festgelegt sein (siehe Abbildung 1). Alternativ mussconfig_file
derKubernetesPodOperator
-Konfiguration hinzugefügt werden (wie im folgenden Code gezeigt). )
- </ph>
- So ändern Sie den Code einer Aufgabe mit KubernetesPodOperator:
KubernetesPodOperator(
# config_file parameter - can be skipped if connection contains this setting
config_file="/home/airflow/composer_kube_config",
# definition of connection to be used by the operator
kubernetes_conn_id='kubernetes_default',
...
)
Weitere Informationen zu Version 5.0.0 finden Sie in den Versionshinweisen für CNCF-Kubernetes-Anbieter.
Fehlerbehebung
Tipps zur Behebung von Pod-Fehlern
Prüfen Sie neben den Aufgabenlogs in der Airflow-UI auch die folgenden Logs:
Ausgabe des Airflow-Planers und der Airflow-Worker:
Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.
Folgen Sie dem Link zu DAGs für Ihre Umgebung.
Gehen Sie in dem Bucket Ihrer Umgebung eine Ebene höher.
Prüfen Sie die Logs im Ordner
logs/<DAG_NAME>/<TASK_ID>/<EXECUTION_DATE>
.
Detaillierte Pod-Logs in der Google Cloud Console unter GKE-Arbeitslasten. Diese Logs enthalten die YAML-Datei der Pod-Definition, Pod-Ereignisse und Pod-Informationen.
Rückgabewerte ungleich null, wenn auch GKEStartPodOperator
verwendet wird
Wenn Sie KubernetesPodOperator
und GKEStartPodOperator
verwenden, bestimmt der Rückgabecode des Einstiegspunkts des Containers, ob die Aufgabe als erfolgreich betrachtet wird oder nicht. Rückgabecodes mit einem Wert ungleich null weisen auf einen Fehler hin.
Ein häufiges Muster bei Verwendung von KubernetesPodOperator
und
GKEStartPodOperator
ist das Ausführen eines Shell-Skripts als Container
Einstiegspunkt zum Gruppieren mehrerer Vorgänge innerhalb des Containers.
Wenn Sie ein solches Skript schreiben, sollten Sie den Befehl set -e
oben im Skript einfügen, sodass fehlgeschlagene Befehle im Skript das Skript beenden und den Fehler an die Airflow-Aufgabeninstanz weiterleiten.
Pod-Zeitüberschreitungen
Das Standardzeitlimit für KubernetesPodOperator
beträgt 120 Sekunden. Dies kann zu Zeitüberschreitungen führen, bevor größere Images heruntergeladen sind. Sie können das Zeitlimit erhöhen, wenn Sie beim Erstellen von KubernetesPodOperator
den Parameter startup_timeout_seconds
ändern.
Wenn eine Pod-Zeitüberschreitung auftritt, ist das aufgabenspezifische Log in der Airflow-UI verfügbar. Beispiel:
Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 27, in <module>
args.func(args)
File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
pool=args.pool,
File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start
Pod-Zeitüberschreitungen können auch auftreten, wenn Cloud Composer-Dienstkonto nicht über die erforderlichen IAM-Berechtigungen verfügt, um die Aufgabe am Hand. Sehen Sie sich hierzu die Fehler auf Pod-Ebene mit der Methode GKE-Dashboards zum Ansehen der Logs für Ihre oder Cloud Logging verwenden.
Neue Verbindung konnte nicht hergestellt werden
Automatische Upgrades sind in GKE-Clustern standardmäßig aktiviert. Wenn sich ein Knotenpool in einem Cluster befindet, für den gerade ein Upgrade durchgeführt wird, sehen Sie möglicherweise die folgende Fehlermeldung:
<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused
Wenn Sie prüfen möchten, ob ein Upgrade des Clusters ausgeführt wird, rufen Sie in der Google Cloud Console die Kubernetes-Cluster und suchen Sie nach dem Ladesymbol neben der den Clusternamen der Umgebung.