G-gen の堂原です。マネージドな Apache Airflow 環境を提供する Cloud Composer について、Cloud Composer 2 を中心に解説します。
はじめに
Cloud Composer は Google Cloud (旧称 GCP) のフルマネージドな、データパイプライン用のワークフロー管理サービスで、 実体は Apache Airflow をマネージドな環境で提供するサービスです。
そのため、本記事ではまず Airflow について紹介し、その後に改めて Cloud Composer (メジャーバージョン 2) について紹介します。
※ 一部 Airflow のコンポーネントなどは Cloud Composer の方で解説しています。
Airflow とは
概要
Airflow は、 元は Airbnb 社で開発され現在は Apache Software Foundation のプロジェクトとなっている OSS で、データパイプライン用のワークフロー管理ツールです。
例えば下図のように
task_1
の後にtask_2
を処理するtask_2
処理後、task_3
とtask_4
が並列で処理されるtask_3
とtask_4
処理後、task_5
が処理される
などといった処理の順序や依存関係を組み立てることができます。
Airflow においては、個々の処理 (Task と呼ばれます) の順番や依存関係を DAG (Directed Acyclic Graph) というもので定義します。 また、各 Task は Operator と呼ばれるテンプレートを用いて作成することができます。
これらは Python で記述され、上図を実現する Python コードは以下のようになります。
(下のコードだと、各 Task はただ echo
を実行しているだけですが、実際はここに ETL (Extract, Transform, Load) 処理を当てはめていくことになります。)
import airflow from airflow import DAG from airflow.operators.bash import BashOperator from airflow.utils.dates import days_ago with DAG( 'test-dag', description='test dag', schedule_interval='*/10 * * * *', start_date=airflow.utils.dates.days_ago(1), catchup=False, max_active_runs=1) as dag: task_1 = BashOperator( task_id='task_1', bash_command='echo "Task 1"', dag=dag) task_2 = BashOperator( task_id='task_2', bash_command='echo "Task 2"', dag=dag) task_3 = BashOperator( task_id='task_3', bash_command='echo "Task 3"', dag=dag) task_4 = BashOperator( task_id='task_4', bash_command='echo "Task 4"', dag=dag) task_5 = BashOperator( task_id='task_5', bash_command='echo "Task 5"', dag=dag) task_1 >> task_2 >> [task_3, task_4] >> task_5
特徴
様々なサービス・ツールと連携可能
Airflow は様々なツールやサービスと連携し、ETL 処理を実行することが可能です。
データの取得・格納先として、例えば Google Cloud であれば BigQuery や Google Cloud Storage (GCS)、AWS であれば Redshift や S3、その他各種データベース (MySQL, PostgreSQL, MongoDB など) が選択できます。
データの処理基盤としては、大規模なデータの処理が行える OSS である Apache Beam や、Kubernetes などが選択でき、Python で記述した簡単な処理であれば Airflow の基盤上で実行するといったことも可能です。
クラウドサービスを使う場合、Cloud SDK や AWS SDK for Python (Boto3) 等を使って各処理を書き込む必要はなく、
用途に合った Operator を選択し、各パラメータを記載するだけで Task を作成し処理を実現することができます。
(勿論、権限設定やライブラリのインストール自体は必要となります。)
例えば BigQuery テーブルからデータを取得したい場合は BigQueryGetDataOperator
を用いて Task を作成します。
get_data_from_bq = BigQueryGetDataOperator( task_id = 'get_data_from_bq', project_id='test-project', dataset_id = 'test_dataset', table_id = 'test_table', dag = dag )
定期実行されるバッチ方式のワークフローに特化
オンデマンドに実行することも可能ですが、Airflow は定期的に実行されるバッチ方式のワークフローに特化しています。 (逆に言えば Airflow はストリーミング処理には適していません。)
そのため、Airflow では定期実行を意識した機能が用意されています。
例えばバックフィル機能を使えば、スタート時点から、DAG で定めた間隔分だけワークフローを実行してくれます。
例 : スタート時点を一ヶ月前・間隔を 1 日にすると、過去 30 日分のワークフローを順に実行
当日に追加されたデータのみを処理するといったワークフローを実装している場合、本機能を用いることで、過去分のデータまで処理させるといったことが可能になります。
可視性
Airflow では、DAG の設定や実行、個々のタスクの実行結果の確認などができる Airflow UI が提供されています。
下図は、DAG のこれまでの実行結果を確認できるページです。
この他にも
- DAG のリアルタイムの実行状況
- カレンダー形式での日次の実施状況
- 各実施結果のログ
などの情報が Airflow UI で確認可能であり、優れたモニタリング機能となっています。
Cloud Composer とは
概要
改めて、Cloud Composer は Google Cloud (旧称 GCP) のフルマネージドな、Apache Airflow 環境を提供するサービスです。
ユーザが DAG を GCS にアップロードすると、Cloud Composer がワークフローの実行や計算リソースのスケーリングを管理してくれます。
基本的な利用の流れは以下のようになります。
- バージョンや計算リソースのスペックを指定して Cloud Composer 環境 を作成
(以後、「環境」と記載する場合は Cloud Composer 環境を指します)- 計算リソースのスペックは後から変更可能です
- 環境毎に作成される GCS に DAG を記載した Python ソースコードをアップロード
- Cloud Composer が DAG を自動で検知し、記載されたトリガーに従ってワークフローの実行・管理を行う
また、以下のようなことを Google Cloud コンソールから確認・実施することが可能と、高い利便性が提供されています。
- 各 DAG のグラフやソースコードが確認可能
- Airflow UI へのアクセス
- Airflow の構成オプションや環境変数、PyPI パッケージの上書きや追加・変更
メリット
先述した Airflow の特徴も踏まえ、Cloud Composer には以下のようなメリットが存在します。
- Python で、複雑なパイプラインを簡単に構築することができる
- Google Cloud の複数のサービスを横断するパイプラインの構築が可能
- Google Cloud だけでなく、ハイブリッドやマルチクラウドにも対応
例 : S3 から GCS を経由して BigQuery にデータ保存などといったパイプラインを構築可能
- 複雑な Airflow 環境を簡単に立ち上げられる
- 計算リソースの自動スケーリング機能あり(Cloud Composer 2 のみ)
- ただしサーバレスではないため、各コンポーネント (後述) のスペックのチューニングは必要となります
苦手な点
Cloud Composer は、逆に以下のような点を苦手としています。
- Python でのパイプライン構築が必須
- 簡単なパイプラインを非エンジニアの方が作成したいのなら、Dataprep や Cloud Data Fusion といったノーコードサービスのほうが適しています
- ストリーミング処理
- Google Cloud なら、Dataflow が適任です
バージョン
概要
Cloud Composer においては高頻度で新しいバージョンがリリースされています。
各バージョンは composer-a.b.c-airflow-x.y.z
という形式で表され、a.b.c
が Cloud Composer のバージョンで x.y.z
が Airflow のバージョンとなります。
基本的に Cloud Composer の 1 つのバージョンにつき、2 つの Airflow マイナーバージョンがサポートされます。 例えば Cloud Composer 2.2.1 であれば、Airflow 2.5.1 と Airflow 2.4.3 をサポートします。
メジャーバージョン
Cloud Composer においては 2 つのメジャーバージョン、Cloud Composer 1 と Cloud Composer 2 が存在します。 数字の通り Cloud Composer 2 が後継です。
機能面の大きな違いとして Cloud Composer 1 は後述する Airflow ワーカーのスケーリングが手動で、Cloud Composer 2 は自動スケーリングとなっています。
また、 Cloud Composer 1 は Standard モードの Google Kubernetes Engine (GKE) クラスタで実装されている一方、Cloud Composer 2 は Autopilot モードの GKE クラスタで構築されており、アーキテクチャ的にも大きく異なっています。
Cloud Composer 1 は、2023 年 3 月 24 日にリリースされたバージョン 1.20.11 が最後で、今後はバグの修正と小規模な改善のみが行われます。 そのため、現在 Cloud Composer を使用する場合は、Cloud Composer 2 の使用が推奨されます。
公式ドキュメントにおいては、Cloud Composer 1 用のページと Cloud Composer 2 用のページが用意されているので、間違えないようご注意ください。
ライフサイクル
Cloud Composer の各バージョンはリリース日を基準として、以下のようなサポート期間が設けられています。
- 0 - 12 ヶ月 : 完全サポート
- 12 - 18 ヶ月 : セキュリティに関する通知のみ行われる
- 18 ヶ月 - : 全てユーザ管理
常にサポート対象のバージョンにアップグレードすることが推奨されてはいますが、2023 年 6 月時点では、サポート期間終了後も強制アップグレードは実施されず、同じバージョンを使用し続けることが可能です。
アップグレード
2023 年 6 月時点ではプレビュー機能となっていますが、Cloud Composer においては環境のアップグレード機能が提供されています。
また、アップグレードによって PyPI パッケージの競合が発生しないかを事前に確認することも可能です。
ただし先述した通り、本機能はプレビュー機能となっているため、本番環境で使用する前にしっかり検証しておく必要があります。
- 参考 : 環境をアップグレードする
コンポーネント
概要
ここからは改めて Cloud Composer 2 についての解説となります。
Cloud Composer 2 は複数の Google Cloud サービス上に存在する複数のコンポーネントから成り立っていますが、主に意識する必要のあるコンポーネントは以下の通りです。
コンポーネント | コンポーネントを実装している Google Cloud サービス |
---|---|
Airflow ワーカー | Autopilot モードの GKE クラスタ |
Airflow スケジューラ | Autopilot モードの GKE クラスタ |
Airflow ウェブサーバ | Autopilot モードの GKE クラスタ |
Airflow トリガラー | Autopilot モードの GKE クラスタ |
Airflow データベース | Cloud SQL インスタンス |
環境用バケット | GCS バケット |
上表のコンポーネントを実装している Google Cloud サービスの内、GKE クラスタと GCS については Google Cloud コンソールから確認が可能です。 ただし、環境が壊れる可能性があるため、GKE クラスタの変更は非推奨です。
- 参考 : 環境コンポーネント
Airflow ワーカー
Airflow ワーカーは、環境に存在する DAG の個々の Task を実行する、Cloud Composer における計算リソースとなります。
Task 量に応じて自動でスケーリングが行われます。
各ワーカーのスペック (vCPU、メモリサイズ、ストレージサイズ) と、スケーリングにおけるワーカーの最小数と最大数が指定できます。
Airflow スケジューラ
Airflow スケジューラは、環境に存在する DAG の実行と、各 DAG 個々の Task の実行のスケジューリングを制御します。
個々の Task は GKE クラスタ内に実装されているキューを通して、スケジューラからワーカーへ分配されていますが、ユーザがキューを意識する必要はほとんどありません。
スケジューラにおいては、スケジューラのスペック (vCPU、メモリサイズ、ストレージサイズ) と、実行するスケジューラの数を指定できます。
Airflow ウェブサーバ
Airflow ウェブサーバは Airflow UI を提供するコンポーネントです。
ウェブサーバはスペック (vCPU、メモリサイズ、ストレージサイズ) を指定できます。
Airflow UI へのアクセス権限は IAM を用いて管理することが可能で、アクセスには composer.environments.get
権限が必要です。
また、指定した CIDR からのみのアクセスを許可する、IP アドレスベースでのアクセス制御も可能です。
Airflow トリガラー
Airflow トリガラーは、Airflow の Deferrable Operators (Google Cloud の日本語ページだと「遅延可能な演算子」と記載されています) という機能の管理に用いられます。
通常、BigQuery ジョブを呼び出したりと、外部のシステム上で処理行うような Task を実行している最中も、その Task はワーカースロットを占有してしまいます。 Deferrable Operators は、そのような処理の監視をトリガラーが代わりに行うことで、ワーカースロットを解放することができる機能です。
Cloud Composer においてはトリガラーの数は 0 にすることも可能で、トリガラーの数を 1 以上にすることで Deferrable Operators が有効化されます。 また、トリガラーのスペック (vCPU、メモリサイズ、ストレージサイズ) も指定できます。
Airflow データベース
Airflow データベースは、Airflow のメタデータデータベースをホストしています。
データベースの実体である Cloud SQL インスタンスは Google の方で完全に管理されており、Google Cloud コンソールでは確認することもできません。
データベースのスペック調整は、Airflow スケジューラで触れた Airflow キューなどとひとまとめにされた、コアインフラストラクチャ単位で行われます。 コアインフラストラクチャは「小、中、大」の 3 つの環境サイズが用意されており、DAG の数や規模感に応じて選択することになります。
ただし、データベースのディスク容量のみ、必要に応じて自動的に増加します。
- 参考 : 環境サイズを変更する
- 参考 : データベースのディスク容量
料金
前提
Cloud Composer 2 の料金体系を理解するのは、これまで紹介した内容を踏まえ、以下の点を理解しておく必要があります。
- Cloud Composer 1 と Cloud Composer 2 では料金体系が大きく異なる
- Airflow ワーカー、スケジューラ、ウェブサーバ及びトリガラーは vCPU、メモリサイズ、ストレージサイズについて調整が可能
- Airflow データベースや Airflow キューなどのコンポーネントはまとめてコアインフラストラクチャという単位でサイズ調整を行う
- ただし、データベースのディスク容量のみ、必要に応じて自動的に増加
種類
Cloud Composer 2 においては、主に以下の料金が発生します。
- Cloud Composer コンピューティング料金
- Airflow データベースのストレージ料金
- コアインフラストラクチャ料金
ただし、環境毎に用意される GCS バケットや、後述するプライベート環境を構築する際に使用される Private Service Connect についても別途料金が発生します。
料金表は以下の公式ページを参照ください。
Cloud Composer コンピューティング料金
Cloud Composer コンピューティング料金は、Airflow ワーカー、スケジューラ、ウェブサーバ及びトリガラーがそれぞれ使用している vCPU、メモリ、ストレージに対して発生する料金です。 ワーカーやスケジューラ、トリガラーについては、指定したスペックに起動数を掛けた値となります。
vCPU、メモリサイズ、ストレージサイズはそれぞれ個別に計算され、時間単位で料金が発生します。
Airflow データベースのストレージ料金
Airflow データベースのディスク容量は必要に応じて自動的に増加し、そのサイズに応じて料金が発生します。
コアインフラストラクチャ料金
コアインフラストラクチャ料金は、コアインフラストラクチャの 3 つの環境サイズ 「小、中、大」に応じて発生する料金です。
後述する復元力モードが「標準的な復元力」か「高い復元力」によっても料金が異なり、「高い復元力」の方が料金は高くなります。
コスト最適化
Cloud Composer の費用を最適化する方法が、以下で紹介されています。
- 参考 : 環境のパフォーマンスと費用を最適化する
Cloud Composer 2 においては、まず「小、中、大」の粒度のプリセットで用意されているスペックのいずれかで利用を開始します。
※ 「プリセットスペック = Airflow ワーカー、スケジューラ、ウェブサーバ及びトリガラーのスペック + コアインフラストラクチャの環境サイズ」であり、コアインフラストラクチャの環境サイズ「小、中、大」とイコールではありません。
その後、実際のワークロード (DAG) を実行してパフォーマンスを観察します。後述の「モニタリング」の項で紹介する通り、Composer のパフォーマンスは Cloud Monitoring で自動的に収集されています。その収集結果を見ながら、各コンポーネントのスペックを調整していきます。その際は、Airflow ワーカーのみスペックを下げるなど、個別に調整することも可能です。
セキュリティ
サービスアカウント
Cloud Composer においては、2 つのサービスアカウントが登場します。
- 環境のサービスアカウント
- Cloud Composer のサービスエージェントアカウント
環境のサービスアカウントは環境毎に設定するもので、環境内のワークフローはこのサービスアカウントの権限を使用して他の Google Cloud サービスにアクセスします。
環境のサービスアカウントには、デフォルトの Compute Engine サービスアカウントまたはユーザが作成したサービスアカウントを用いることが可能です。 ユーザが作成したサービスアカウントを用いる場合は、そのサービスアカウントに Composer ワーカーロールを付与する必要があります。
一方、Cloud Composer のサービスエージェントアカウントは Google が管理しており、プロジェクト内の全ての環境で使用されています。
環境作成時は Cloud Composer のサービスエージェントアカウントを用いて Workload Identity 設定を行う都合上、Cloud Composer のサービスエージェントアカウントに環境のサービスアカウントに対する Cloud Composer v2 API サービス エージェント拡張機能ロールを付与する必要があります。
まとめると、環境構築時、基本的には以下の権限を各サービスアカウントに付与する必要があります。
- 環境のサービスアカウント : プロジェクトに対する Composer ワーカーロール
Cloud Composer のサービスエージェントアカウント : 環境のサービスアカウントに対する Cloud Composer v2 API サービス エージェント拡張機能ロール
参考 : IAM を使用したアクセス制御
プライベート環境
通常 Cloud Composer においては、GKE クラスタと Cloud SQL インスタンスに対してパブリック IP アドレスが付与されます。
プライベート環境設定を行うと、GKE クラスタは限定公開クラスタとして作成され、また Cloud SQL はプライベート IP アドレスのみを有するようになり、セキュアな環境を構築できます。
限定公開クラスタについては以下の記事で紹介しています。
また、プライベート環境設定においても以下のオプションがあります。
- Google 管理の VPC (Cloud SQL インスタンスが存在) とユーザ管理の VPC (GKE クラスタが存在) 間の接続方法
- Private Service Connect
- VPC ピアリング
- GKE クラスタのコントロールプレーンのパブリックエンドポイントの有効・無効化
これらの設定は環境作成時のみに設定可能で、作成後の変更はできません。
プライベート環境にした場合、ワークフローから外部ネットワークに接続するためには Cloud NAT 等が必要となります。
また、GKE クラスタのコントロールプレーンのパブリックエンドポイントを無効にした場合、ローカル環境からの Airflow CLI コマンドの実行ができなくなります。
もう一点注意点として、Airflow ウェブサーバ (Airflow UI) へのアクセスはプライベート環境にしても制限されません。
- 参考 : プライベート IP 環境
耐障害性
スナップショット
Cloud Composer においては、以下のようなデータのスナップショットを定期的にまたはオンデマンドに保存し、環境の復元ができます。
- Airflow 構成オプションの上書き、環境変数、PyPI パッケージ
- Airflow データベースのバックアップ
- 環境用 GCS バケットの
/dag
、/data
及び/plugins
フォルダ
スナップショットはデフォルトでは環境用 GCS バケットの /snapshots
フォルダに保存されますが、宛先の変更も可能です。
- 参考 : 環境のスナップショットの保存と読み込み
復元力モード
Cloud Composer においては、復元力モードというものが指定できます。 復元力モードを「高い復元力」とした場合、環境が複数のゾーンを跨る形で構築され耐障害性が増します。
具体的には以下のような構成になります。
- 2 つ以上の Airflow ワーカー及び 2 つの Ariflow スケジューラ、ウェブサーバ、トリガラーが異なるゾーンに配置される
- Airflow データベースを構成する Cloud SQL インスタンスは高可用性モードで実行される
注意点として、「高い復元力」はプライベート環境のみで選択可能です。
また、コアインフラストラクチャに対して通常より高い料金が発生します。
モニタリング
Cloud Composer は Cloud Monitoring と統合されており、各コンポーネントの CPU 使用率、メモリ使用量などのメトリクスが自動的に収集されています。
これにより、コンソール画面で Airflow ワーカー、スケジューラ、ウェブサーバ、トリガラーおよびデータベースの稼働状況を確認することができます。
Cloud Composer においては各コンポーネントのスペックを後から変更することが可能なため、これらの稼働状況は適切なスペックを判断するのに役立ちます。前述した「コスト最適化」の項もご参照ください。
堂原 竜希(記事一覧)
クラウドソリューション部データアナリティクス課。2023年4月より、G-genにジョイン。
Google Cloud Partner Top Engineer 2023, 2024に選出 (2024年はRookie of the yearにも選出)。休みの日はだいたいゲームをしているか、時々自転車で遠出をしています。
Follow @ryu_dohara