1. はじめに
ワークフローはデータ分析における一般的なユースケースのひとつで、データの取り込み、変換、分析によってデータから有益な情報を見つけるために使用されます。Google Cloud Platform では、ワークフローをオーケストレートするためのツールとして Cloud Composer が用意されています。これは、よく利用されているオープンソース ワークフロー ツール Apache Airflow のホスト型バージョンです。このラボでは、Cloud Composer を使用して、Cloud Dataproc クラスタを作成し、Cloud Dataproc と Apache Hadoop を使用して分析し、その後 Cloud Dataproc クラスタを削除するシンプルなワークフローを作成します。
Cloud Composer とは
Cloud Composer は、フルマネージドなワークフロー オーケストレーション サービスです。クラウドとオンプレミス データセンターにまたがるパイプラインの作成、スケジューリング、モニタリングを実現します。Cloud Composer は、広く利用されている Apache Airflow のオープンソース プロジェクト上に構築され、Python プログラミング言語を使用して運用されています。特定のベンダーに縛られることがなく、簡単に使用できます。
Apache Airflow のローカル インスタンスの代わりに Cloud Composer を使用することで、ユーザーはインストールや管理のオーバーヘッドなしに Airflow の長所を活用できます。
Apache Airflow とは
Apache Airflow は、ワークフローの作成、スケジューリング、モニタリングをプログラムで行うために使用されるオープンソース ツールです。このラボで説明する Airflow に関する重要な用語をいくつか紹介します。
- DAG - DAG(有向非巡回グラフ)は、スケジュールを設定して実行する整理されたタスクの集まりです。ワークフローとも呼ばれる DAG は標準の Python ファイルで定義される
- 演算子 - 演算子はワークフロー内の 1 つのタスクを表します。
Cloud Dataproc とは何ですか?
Cloud Dataproc は、Google Cloud Platform のフルマネージド Apache Spark および Apache Hadoop サービスです。Cloud Dataproc は他の GCP サービスと簡単に統合できるため、データ処理、分析、ML のためのパワフルで包括的なプラットフォームとして使用できます。
演習内容
この Codelab では、次のタスクを実行する Apache Airflow ワークフローを Cloud Composer で作成して実行する方法について説明します。
- Cloud Dataproc クラスタを作成する
- Apache Hadoop ワードカウント ジョブをクラスタで実行し、その結果を Cloud Storage に出力する
- クラスタを削除します。
学習内容
- Cloud Composer で Apache Airflow ワークフローを作成して実行する方法
- Cloud Composer と Cloud Dataproc を使用してデータセットで分析を実行する方法
- Google Cloud Platform コンソール、Cloud SDK、Airflow ウェブ インターフェースを介して Cloud Composer 環境にアクセスする方法
必要なもの
- GCP アカウント
- CLI の基本的な知識
- Python の基本的な理解
2. GCP の設定
プロジェクトを作成する
Google Cloud Platform プロジェクトを選択または作成します。
プロジェクト ID をメモしておきます。この ID は後の手順で使用します。
新しいプロジェクトを作成する場合は、作成ページのプロジェクト名のすぐ下にプロジェクト ID が表示されます。 | |
すでにプロジェクトを作成している場合は、コンソールのホームページのプロジェクト情報カードで ID を確認できます。 |
API を有効にする
Cloud Composer、Cloud Dataproc、Cloud Storage API を有効にする。有効にしたら、[認証情報に移動] ボタンは無視して、チュートリアルの次のステップに進みます。 |
Composer 環境を作成する
次の構成で Cloud Composer 環境を作成します。
その他の構成はすべてデフォルトのままにできます。下部の [作成] をクリックします。 |
Cloud Storage バケットを作成する
プロジェクトで、Cloud Storage バケットを作成して次の構成にします。
準備ができたら [作成] をクリックします |
3. Apache Airflow の設定
Composer 環境情報の表示
GCP Console で [環境] ページを開きます。
環境の名前をクリックして詳細を表示します。
[環境の詳細] ページには、Airflow ウェブ インターフェースの URL、Google Kubernetes Engine クラスタ ID、Cloud Storage バケットの名前、/dags フォルダのパスなどの情報が表示されます。
Airflow における DAG(有向非巡回グラフ)とは、スケジュールを設定して実行する、整理されたタスクの集まりです。DAG(ワークフロー)は、標準の Python ファイルで定義されます。Cloud Composer がスケジュールを設定するのは、/dags フォルダ内の DAG のみです。/dags フォルダは、環境を作成するときに Cloud Composer によって自動的に作成される Cloud Storage バケット内にあります。
Apache Airflow 環境変数の設定
Apache Airflow 変数は Airflow 固有のコンセプトであり、環境変数とは異なります。このステップでは、gcp_project
、gcs_bucket
、gce_zone
の 3 つの Airflow 変数を設定します。
gcloud
を使用して変数を設定する
まず、Cloud SDK がインストールされている Cloud Shell を開きます。
環境変数 COMPOSER_INSTANCE
を Composer 環境の名前に設定する
COMPOSER_INSTANCE=my-composer-environment
gcloud コマンドライン ツールを使用して Airflow 変数を設定するには、gcloud composer environments run
コマンドと variables
サブコマンドを使用します。この gcloud composer
コマンドは、Airflow CLI のサブコマンド variables
を実行します。サブコマンドは、引数を gcloud
コマンドライン ツールに渡します。
このコマンドを 3 回実行し、変数をプロジェクトに関連する変数に置き換えます。
次のコマンドを使用して gcp_project
を設定します。<your-project-id> は、ステップ 2 でメモしたプロジェクト ID に置き換えてください。
gcloud composer environments run ${COMPOSER_INSTANCE} \ --location us-central1 variables -- --set gcp_project <your-project-id>
出力は次のようになります。
kubeconfig entry generated for us-central1-my-composer-env-123abc-gke. Executing within the following Kubernetes cluster namespace: composer-1-10-0-airflow-1-10-2-123abc [2020-04-17 20:42:49,713] {settings.py:176} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=449 [2020-04-17 20:42:50,123] {default_celery.py:90} WARNING - You have configured a result_backend of redis://airflow-redis-service.default.svc.cluste r.local:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database). [2020-04-17 20:42:50,127] {__init__.py:51} INFO - Using executor CeleryExecutor [2020-04-17 20:42:50,433] {app.py:52} WARNING - Using default Composer Environment Variables. Overrides have not been applied. [2020-04-17 20:42:50,440] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg [2020-04-17 20:42:50,452] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg
次のコマンドを使用して gcs_bucket
を設定します。<your-bucket-name>
は、ステップ 2 でメモしたバケット ID に置き換えます。推奨に沿ってバケット名を設定した場合、バケット名はプロジェクト ID と同じになります。出力は前のコマンドと同様になります。
gcloud composer environments run ${COMPOSER_INSTANCE} \ --location us-central1 variables -- --set gcs_bucket gs://<your-bucket-name>
次のコマンドを使用して gce_zone
を設定します。出力は前のコマンドと同様です。
gcloud composer environments run ${COMPOSER_INSTANCE} \ --location us-central1 variables -- --set gce_zone us-central1-a
(省略可)gcloud
を使用して変数を表示
変数の値を表示するには、Airflow CLI サブコマンド variables
に get
引数を指定して実行するか、Airflow UI を使用します。
例:
gcloud composer environments run ${COMPOSER_INSTANCE} \ --location us-central1 variables -- --get gcs_bucket
これは、先ほど設定した 3 つの変数(gcp_project
、gcs_bucket
、gce_zone
)のいずれかで行うことができます。
4. サンプル ワークフロー
ステップ 5 で使用する DAG のコードを見てみましょう。ファイルのダウンロードはまだ行わず、手順に沿って操作してください。
取り上げるべきことがたくさんあるため、少し詳しく見ていきましょう。
from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule
まず、Airflow のインポートを行います。
airflow.models
- Airflow データベース内のデータにアクセスして作成できます。airflow.contrib.operators
- コミュニティの運営者が居住している国。この場合、Cloud Dataproc API にアクセスするにはdataproc_operator
が必要です。airflow.utils.trigger_rule
- 演算子にトリガールールを追加します。トリガールールを使用すると、演算子を親のステータスに関連して実行するかどうかをきめ細かく制御できます。
output_file = os.path.join(
models.Variable.get('gcs_bucket'), 'wordcount',
datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
出力ファイルの場所を指定します。ここで注目すべき行は models.Variable.get('gcs_bucket')
です。これは、Airflow データベースから gcs_bucket
変数値を取得します。
WORDCOUNT_JAR = (
'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
input_file = 'gs://pub/shakespeare/rose.txt'
wordcount_args = ['wordcount', input_file, output_file]
WORDCOUNT_JAR
- 最終的に Cloud Dataproc クラスタで実行する .jar ファイルの場所。これは GCP ですでにホストされています。input_file
- Hadoop ジョブが最終的に計算するデータを含むファイルの場所。ステップ 5 で、その場所にデータをまとめてアップロードします。wordcount_args
- jar ファイルに渡す引数。
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
これにより、前日の午前 0 時を表す datetime オブジェクトがこれに相当します。たとえば、これが 3 月 4 日 11 時に実行された場合、datetime オブジェクトは 3 月 3 日 00:00 を表します。これは、Airflow がスケジューリングを処理する方法に関係しています。詳しくは、こちらをご覧ください。
default_dag_args = {
'start_date': yesterday,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': models.Variable.get('gcp_project')
}
新しい DAG が作成されるたびに、辞書形式の default_dag_args
変数を指定する必要があります。
'email_on_failure'
- タスクが失敗したときにメール通知アラートを送信するかどうかを指定します。'email_on_retry'
- タスクの再試行時にメール通知アラートを送信するかどうかを示します。'retries'
- DAG が失敗した場合に Airflow が行う再試行回数を示します。'retry_delay'
- 再試行を試行する前に Airflow が待機する時間を示します。'project_id'
- DAG に関連付ける GCP プロジェクト ID を指定します。これは、後で Dataproc オペレータで必要になります。
with models.DAG(
'composer_hadoop_tutorial',
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
with models.DAG
を使用すると、その下にあるすべてを同じ DAG 内に含めるようにスクリプトに指示します。また、3 つの引数が渡されていることもわかります。
- 1 つ目の文字列は、作成する DAG に付ける名前です。この例では
composer_hadoop_tutorial
を使用します。 schedule_interval
-datetime.timedelta
オブジェクト。ここでは 1 日に設定しています。つまり、この DAG は、'default_dag_args'
で先ほど設定した'start_date'
の後に 1 日 1 回実行されます。default_args
- DAG のデフォルト引数を含む、前に作成した辞書
Dataproc クラスタを作成する
次に、Cloud Dataproc クラスタを作成する dataproc_operator.DataprocClusterCreateOperator
を作成します。
create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
task_id='create_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
num_workers=2,
zone=models.Variable.get('gce_zone'),
master_machine_type='n1-standard-1',
worker_machine_type='n1-standard-1')
この演算子にはいくつかの引数がありますが、最初の引数以外の引数はすべてこの演算子に固有のものです。
task_id
- BashOperator と同様に、これはオペレーターに割り当てる名前で、Airflow UI から表示できます。cluster_name
- Cloud Dataproc クラスタに割り当てる名前。ここでは、composer-hadoop-tutorial-cluster-{{ ds_nodash }}
という名前を付けています(オプションの追加情報については情報ボックスをご覧ください)。num_workers
- Cloud Dataproc クラスタに割り当てるワーカーの数zone
- クラスタを配置する地理的リージョン(Airflow データベース内に保存されます)。これにより、ステップ 3 で設定した'gce_zone'
変数が読み取られます。master_machine_type
- Cloud Dataproc マスターに割り当てるマシンのタイプworker_machine_type
- Cloud Dataproc ワーカーに割り当てるマシンのタイプ
Apache Hadoop ジョブを送信する
dataproc_operator.DataProcHadoopOperator
を使用すると、Cloud Dataproc クラスタにジョブを送信できます。
run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
task_id='run_dataproc_hadoop',
main_jar=WORDCOUNT_JAR,
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
arguments=wordcount_args)
次のパラメータが用意されています。
task_id
- DAG のこの部分に割り当てる名前main_jar
- クラスタに対して実行する .jar ファイルの場所cluster_name
- ジョブを実行するクラスタの名前。前の演算子と同じです。arguments
- コマンドラインから .jar ファイルを実行する場合と同様に、JAR ファイルに渡される引数
クラスタを削除する
最後に作成する演算子は dataproc_operator.DataprocClusterDeleteOperator
です。
delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
task_id='delete_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
trigger_rule=trigger_rule.TriggerRule.ALL_DONE)
名前が示すように、このオペレーターは特定の Cloud Dataproc クラスタを削除します。ここでは 3 つの引数があります。
task_id
- BashOperator と同様に、これはオペレーターに割り当てる名前で、Airflow UI から表示できます。cluster_name
- Cloud Dataproc クラスタに割り当てる名前。ここでは、composer-hadoop-tutorial-cluster-{{ ds_nodash }}
という名前を付けています(オプションの追加情報については、「Dataproc クラスタを作成する」の後の情報ボックスをご覧ください)。trigger_rule
- この手順の冒頭で、インポート時にトリガールールについて簡単に触れましたが、ここでは実際のルールを見てみましょう。デフォルトでは、Airflow オペレーターは、上流の演算子がすべて正常に完了しない限り実行されません。ALL_DONE
トリガールールでは、成功したかどうかにかかわらず、上流の演算子がすべて完了していることのみが必要です。ここでは、Hadoop ジョブが失敗した場合でも、クラスタを破棄することを意味します。
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
最後に、これらの演算子を特定の順序で実行します。これは、Python のビットシフト演算子を使用して指定できます。この場合、create_dataproc_cluster
が常に最初に実行され、次に run_dataproc_hadoop
、最後に delete_dataproc_cluster
が実行されます。
まとめると、コードは次のようになります。
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# [START composer_hadoop_tutorial]
"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.
This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
See https://cloud.google.com/storage/docs/creating-buckets for creating a
bucket.
"""
import datetime
import os
from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule
# Output file for Cloud Dataproc job.
output_file = os.path.join(
models.Variable.get('gcs_bucket'), 'wordcount',
datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = (
'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
# Arguments to pass to Cloud Dataproc job.
input_file = 'gs://pub/shakespeare/rose.txt'
wordcount_args = ['wordcount', input_file, output_file]
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
default_dag_args = {
# Setting start date as yesterday starts the DAG immediately when it is
# detected in the Cloud Storage bucket.
'start_date': yesterday,
# To email on failure or retry set 'email' arg to your email and enable
# emailing here.
'email_on_failure': False,
'email_on_retry': False,
# If a task fails, retry it once after waiting at least 5 minutes
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': models.Variable.get('gcp_project')
}
# [START composer_hadoop_schedule]
with models.DAG(
'composer_hadoop_tutorial',
# Continue to run DAG once per day
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
# [END composer_hadoop_schedule]
# Create a Cloud Dataproc cluster.
create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
task_id='create_dataproc_cluster',
# Give the cluster a unique name by appending the date scheduled.
# See https://airflow.apache.org/code.html#default-variables
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
num_workers=2,
zone=models.Variable.get('gce_zone'),
master_machine_type='n1-standard-1',
worker_machine_type='n1-standard-1')
# Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
# master node.
run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
task_id='run_dataproc_hadoop',
main_jar=WORDCOUNT_JAR,
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
arguments=wordcount_args)
# Delete Cloud Dataproc cluster.
delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
task_id='delete_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
# Setting trigger_rule to ALL_DONE causes the cluster to be deleted
# even if the Dataproc job fails.
trigger_rule=trigger_rule.TriggerRule.ALL_DONE)
# [START composer_hadoop_steps]
# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
# [END composer_hadoop_steps]
# [END composer_hadoop]
5. Airflow ファイルを Cloud Storage にアップロードする
DAG を /dags フォルダにコピーする
- まず、Cloud Shell を開きます。Cloud SDK が便利にインストールされています。
- Python サンプル リポジトリのクローンを作成し、composer/workflows ディレクトリに移動します。
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git && cd python-docs-samples/composer/workflows
- 次のコマンドを実行して、DAG フォルダの名前を環境変数に設定します。
DAGS_FOLDER=$(gcloud composer environments describe ${COMPOSER_INSTANCE} \ --location us-central1 --format="value(config.dagGcsPrefix)")
- 次の
gsutil
コマンドを実行して、/dags フォルダの作成場所にチュートリアル コードをコピーします
gsutil cp hadoop_tutorial.py $DAGS_FOLDER
出力は次のようになります。
Copying file://hadoop_tutorial.py [Content-Type=text/x-python]... / [1 files][ 4.1 KiB/ 4.1 KiB] Operation completed over 1 objects/4.1 KiB.
6. Airflow UI を使用する
GCP コンソールを使用して Airflow ウェブ インターフェースにアクセスするには:
|
Airflow UI について詳しくは、ウェブ インターフェースへのアクセスをご覧ください。
変数を表示する
先ほど設定した変数は、環境内に保持されています。Airflow UI のメニューバーで [管理 > 変数] を選択すると、変数を確認できます。
DAG の実行状況を確認する
DAG ファイルを Cloud Storage の dags
フォルダにアップロードすると、ファイルが Cloud Composer によって解析されます。エラーが検出されなかった場合、そのワークフローの名前が DAG のリストに表示され、すぐに実行されるようキューに登録されます。DAG を表示するには、ページの上部にある [DAG] をクリックします。
composer_hadoop_tutorial
をクリックして DAG の詳細ページを開きます。このページでは、ワークフローのタスクと依存関係が図で示されます。
ツールバーで [Graph View] をクリックし、各タスクのグラフィックにカーソルを合わせてステータスを表示します。各タスクを囲む線の色もステータスを表しています(緑は実行中、赤は失敗など)。
グラフビューからワークフローをもう一度実行するには、次の手順を行います。
- Airflow UI のグラフビューで、
create_dataproc_cluster
グラフィックをクリックします。 - [Clear] をクリックして 3 つのタスクをリセットし、[OK] をクリックして確定します。
次の GCP Console ページに移動して、composer-hadoop-tutorial
ワークフローのステータスと結果を確認することもできます。
- Cloud Dataproc クラスタ: クラスタの作成と削除をモニタリングします。ワークフローによって作成されるクラスタは、ワークフローの間にのみ存在し、最後のワークフロー タスクの一部として削除されます。
- Cloud Dataproc ジョブ: Apache Hadoop ワードカウント ジョブを表示またはモニタリングします。ジョブ ID をクリックすると、ジョブのログ出力を確認できます。
- Cloud Storage ブラウザ。この Codelab 用に作成した Cloud Storage バケット内の
wordcount
フォルダのワードカウントの結果を表示します。
7. クリーンアップ
この Codelab で使用したリソースについて GCP アカウントに課金されないようにする手順は次のとおりです。
- (省略可)データを保存するには、Cloud Composer 環境の Cloud Storage バケットと、この Codelab 用に作成したストレージ バケットからデータをダウンロードします。
- この Codelab で作成した Cloud Storage バケットを削除します。
- 環境の Cloud Storage バケットを削除します。
- Cloud Composer 環境を削除します。環境を削除しても、環境の Storage バケットは削除されません。
必要に応じて、プロジェクトを削除することもできます。
- GCP コンソールで [プロジェクト] ページに移動します。
- プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
- ボックスにプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。