Dataproc クラスタでの Hadoop ワードカウント ジョブの実行

1. はじめに

ワークフローはデータ分析における一般的なユースケースのひとつで、データの取り込み、変換、分析によってデータから有益な情報を見つけるために使用されます。Google Cloud Platform では、ワークフローをオーケストレートするためのツールとして Cloud Composer が用意されています。これは、よく利用されているオープンソース ワークフロー ツール Apache Airflow のホスト型バージョンです。このラボでは、Cloud Composer を使用して、Cloud Dataproc クラスタを作成し、Cloud Dataproc と Apache Hadoop を使用して分析し、その後 Cloud Dataproc クラスタを削除するシンプルなワークフローを作成します。

Cloud Composer とは

Cloud Composer は、フルマネージドなワークフロー オーケストレーション サービスです。クラウドとオンプレミス データセンターにまたがるパイプラインの作成、スケジューリング、モニタリングを実現します。Cloud Composer は、広く利用されている Apache Airflow のオープンソース プロジェクト上に構築され、Python プログラミング言語を使用して運用されています。特定のベンダーに縛られることがなく、簡単に使用できます。

Apache Airflow のローカル インスタンスの代わりに Cloud Composer を使用することで、ユーザーはインストールや管理のオーバーヘッドなしに Airflow の長所を活用できます。

Apache Airflow とは

Apache Airflow は、ワークフローの作成、スケジューリング、モニタリングをプログラムで行うために使用されるオープンソース ツールです。このラボで説明する Airflow に関する重要な用語をいくつか紹介します。

  • DAG - DAG(有向非巡回グラフ)は、スケジュールを設定して実行する整理されたタスクの集まりです。ワークフローとも呼ばれる DAG は標準の Python ファイルで定義される
  • 演算子 - 演算子はワークフロー内の 1 つのタスクを表します。

Cloud Dataproc とは何ですか?

Cloud Dataproc は、Google Cloud Platform のフルマネージド Apache Spark および Apache Hadoop サービスです。Cloud Dataproc は他の GCP サービスと簡単に統合できるため、データ処理、分析、ML のためのパワフルで包括的なプラットフォームとして使用できます。

演習内容

この Codelab では、次のタスクを実行する Apache Airflow ワークフローを Cloud Composer で作成して実行する方法について説明します。

  • Cloud Dataproc クラスタを作成する
  • Apache Hadoop ワードカウント ジョブをクラスタで実行し、その結果を Cloud Storage に出力する
  • クラスタを削除します。

学習内容

  • Cloud Composer で Apache Airflow ワークフローを作成して実行する方法
  • Cloud Composer と Cloud Dataproc を使用してデータセットで分析を実行する方法
  • Google Cloud Platform コンソール、Cloud SDK、Airflow ウェブ インターフェースを介して Cloud Composer 環境にアクセスする方法

必要なもの

  • GCP アカウント
  • CLI の基本的な知識
  • Python の基本的な理解

2. GCP の設定

プロジェクトを作成する

Google Cloud Platform プロジェクトを選択または作成します。

プロジェクト ID をメモしておきます。この ID は後の手順で使用します。

新しいプロジェクトを作成する場合は、作成ページのプロジェクト名のすぐ下にプロジェクト ID が表示されます。

すでにプロジェクトを作成している場合は、コンソールのホームページのプロジェクト情報カードで ID を確認できます。

API を有効にする

Cloud Composer、Cloud Dataproc、Cloud Storage API を有効にする。有効にしたら、[認証情報に移動] ボタンは無視して、チュートリアルの次のステップに進みます。

Composer 環境を作成する

次の構成で Cloud Composer 環境を作成します。

  • 名前: my-composer-environment
  • ロケーション: us-central1
  • ゾーン: us-central1-a

その他の構成はすべてデフォルトのままにできます。下部の [作成] をクリックします。

Cloud Storage バケットを作成する

プロジェクトで、Cloud Storage バケットを作成して次の構成にします。

  • Name: <your-project-id>
  • デフォルトのストレージ クラス: マルチリージョン
  • 所在地: 米国
  • アクセス制御モデル: きめ細かい

準備ができたら [作成] をクリックします

3. Apache Airflow の設定

Composer 環境情報の表示

GCP Console で [環境] ページを開きます。

環境の名前をクリックして詳細を表示します。

[環境の詳細] ページには、Airflow ウェブ インターフェースの URL、Google Kubernetes Engine クラスタ ID、Cloud Storage バケットの名前、/dags フォルダのパスなどの情報が表示されます。

Airflow における DAG(有向非巡回グラフ)とは、スケジュールを設定して実行する、整理されたタスクの集まりです。DAG(ワークフロー)は、標準の Python ファイルで定義されます。Cloud Composer がスケジュールを設定するのは、/dags フォルダ内の DAG のみです。/dags フォルダは、環境を作成するときに Cloud Composer によって自動的に作成される Cloud Storage バケット内にあります。

Apache Airflow 環境変数の設定

Apache Airflow 変数は Airflow 固有のコンセプトであり、環境変数とは異なります。このステップでは、gcp_projectgcs_bucketgce_zone の 3 つの Airflow 変数を設定します。

gcloud を使用して変数を設定する

まず、Cloud SDK がインストールされている Cloud Shell を開きます。

環境変数 COMPOSER_INSTANCE を Composer 環境の名前に設定する

COMPOSER_INSTANCE=my-composer-environment

gcloud コマンドライン ツールを使用して Airflow 変数を設定するには、gcloud composer environments run コマンドと variables サブコマンドを使用します。この gcloud composer コマンドは、Airflow CLI のサブコマンド variables を実行します。サブコマンドは、引数を gcloud コマンドライン ツールに渡します。

このコマンドを 3 回実行し、変数をプロジェクトに関連する変数に置き換えます。

次のコマンドを使用して gcp_project を設定します。<your-project-id> は、ステップ 2 でメモしたプロジェクト ID に置き換えてください。

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gcp_project <your-project-id>

出力は次のようになります。

kubeconfig entry generated for us-central1-my-composer-env-123abc-gke.
Executing within the following Kubernetes cluster namespace: composer-1-10-0-airflow-1-10-2-123abc
[2020-04-17 20:42:49,713] {settings.py:176} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=449
[2020-04-17 20:42:50,123] {default_celery.py:90} WARNING - You have configured a result_backend of redis://airflow-redis-service.default.svc.cluste
r.local:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database).
[2020-04-17 20:42:50,127] {__init__.py:51} INFO - Using executor CeleryExecutor
[2020-04-17 20:42:50,433] {app.py:52} WARNING - Using default Composer Environment Variables. Overrides have not been applied.
[2020-04-17 20:42:50,440] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg
[2020-04-17 20:42:50,452] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg

次のコマンドを使用して gcs_bucket を設定します。<your-bucket-name> は、ステップ 2 でメモしたバケット ID に置き換えます。推奨に沿ってバケット名を設定した場合、バケット名はプロジェクト ID と同じになります。出力は前のコマンドと同様になります。

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gcs_bucket gs://<your-bucket-name>

次のコマンドを使用して gce_zone を設定します。出力は前のコマンドと同様です。

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gce_zone us-central1-a

(省略可)gcloud を使用して変数を表示

変数の値を表示するには、Airflow CLI サブコマンド variablesget 引数を指定して実行するか、Airflow UI を使用します。

例:

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --get gcs_bucket

これは、先ほど設定した 3 つの変数(gcp_projectgcs_bucketgce_zone)のいずれかで行うことができます。

4. サンプル ワークフロー

ステップ 5 で使用する DAG のコードを見てみましょう。ファイルのダウンロードはまだ行わず、手順に沿って操作してください。

取り上げるべきことがたくさんあるため、少し詳しく見ていきましょう。

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

まず、Airflow のインポートを行います。

  • airflow.models - Airflow データベース内のデータにアクセスして作成できます。
  • airflow.contrib.operators - コミュニティの運営者が居住している国。この場合、Cloud Dataproc API にアクセスするには dataproc_operator が必要です。
  • airflow.utils.trigger_rule - 演算子にトリガールールを追加します。トリガールールを使用すると、演算子を親のステータスに関連して実行するかどうかをきめ細かく制御できます。
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep

出力ファイルの場所を指定します。ここで注目すべき行は models.Variable.get('gcs_bucket') です。これは、Airflow データベースから gcs_bucket 変数値を取得します。

WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)

input_file = 'gs://pub/shakespeare/rose.txt'

wordcount_args = ['wordcount', input_file, output_file]
  • WORDCOUNT_JAR - 最終的に Cloud Dataproc クラスタで実行する .jar ファイルの場所。これは GCP ですでにホストされています。
  • input_file - Hadoop ジョブが最終的に計算するデータを含むファイルの場所。ステップ 5 で、その場所にデータをまとめてアップロードします。
  • wordcount_args - jar ファイルに渡す引数。
yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

これにより、前日の午前 0 時を表す datetime オブジェクトがこれに相当します。たとえば、これが 3 月 4 日 11 時に実行された場合、datetime オブジェクトは 3 月 3 日 00:00 を表します。これは、Airflow がスケジューリングを処理する方法に関係しています。詳しくは、こちらをご覧ください。

default_dag_args = {
    'start_date': yesterday,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

新しい DAG が作成されるたびに、辞書形式の default_dag_args 変数を指定する必要があります。

  • 'email_on_failure' - タスクが失敗したときにメール通知アラートを送信するかどうかを指定します。
  • 'email_on_retry' - タスクの再試行時にメール通知アラートを送信するかどうかを示します。
  • 'retries' - DAG が失敗した場合に Airflow が行う再試行回数を示します。
  • 'retry_delay' - 再試行を試行する前に Airflow が待機する時間を示します。
  • 'project_id' - DAG に関連付ける GCP プロジェクト ID を指定します。これは、後で Dataproc オペレータで必要になります。
with models.DAG(
        'composer_hadoop_tutorial',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

with models.DAG を使用すると、その下にあるすべてを同じ DAG 内に含めるようにスクリプトに指示します。また、3 つの引数が渡されていることもわかります。

  • 1 つ目の文字列は、作成する DAG に付ける名前です。この例では composer_hadoop_tutorial を使用します。
  • schedule_interval - datetime.timedelta オブジェクト。ここでは 1 日に設定しています。つまり、この DAG は、'default_dag_args' で先ほど設定した 'start_date' の後に 1 日 1 回実行されます。
  • default_args - DAG のデフォルト引数を含む、前に作成した辞書

Dataproc クラスタを作成する

次に、Cloud Dataproc クラスタを作成する dataproc_operator.DataprocClusterCreateOperator を作成します。

    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

この演算子にはいくつかの引数がありますが、最初の引数以外の引数はすべてこの演算子に固有のものです。

  • task_id - BashOperator と同様に、これはオペレーターに割り当てる名前で、Airflow UI から表示できます。
  • cluster_name - Cloud Dataproc クラスタに割り当てる名前。ここでは、composer-hadoop-tutorial-cluster-{{ ds_nodash }} という名前を付けています(オプションの追加情報については情報ボックスをご覧ください)。
  • num_workers - Cloud Dataproc クラスタに割り当てるワーカーの数
  • zone - クラスタを配置する地理的リージョン(Airflow データベース内に保存されます)。これにより、ステップ 3 で設定した 'gce_zone' 変数が読み取られます。
  • master_machine_type - Cloud Dataproc マスターに割り当てるマシンのタイプ
  • worker_machine_type - Cloud Dataproc ワーカーに割り当てるマシンのタイプ

Apache Hadoop ジョブを送信する

dataproc_operator.DataProcHadoopOperator を使用すると、Cloud Dataproc クラスタにジョブを送信できます。

    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

次のパラメータが用意されています。

  • task_id - DAG のこの部分に割り当てる名前
  • main_jar - クラスタに対して実行する .jar ファイルの場所
  • cluster_name - ジョブを実行するクラスタの名前。前の演算子と同じです。
  • arguments - コマンドラインから .jar ファイルを実行する場合と同様に、JAR ファイルに渡される引数

クラスタを削除する

最後に作成する演算子は dataproc_operator.DataprocClusterDeleteOperator です。

    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

名前が示すように、このオペレーターは特定の Cloud Dataproc クラスタを削除します。ここでは 3 つの引数があります。

  • task_id - BashOperator と同様に、これはオペレーターに割り当てる名前で、Airflow UI から表示できます。
  • cluster_name - Cloud Dataproc クラスタに割り当てる名前。ここでは、composer-hadoop-tutorial-cluster-{{ ds_nodash }} という名前を付けています(オプションの追加情報については、「Dataproc クラスタを作成する」の後の情報ボックスをご覧ください)。
  • trigger_rule - この手順の冒頭で、インポート時にトリガールールについて簡単に触れましたが、ここでは実際のルールを見てみましょう。デフォルトでは、Airflow オペレーターは、上流の演算子がすべて正常に完了しない限り実行されません。ALL_DONE トリガールールでは、成功したかどうかにかかわらず、上流の演算子がすべて完了していることのみが必要です。ここでは、Hadoop ジョブが失敗した場合でも、クラスタを破棄することを意味します。
  create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

最後に、これらの演算子を特定の順序で実行します。これは、Python のビットシフト演算子を使用して指定できます。この場合、create_dataproc_cluster が常に最初に実行され、次に run_dataproc_hadoop、最後に delete_dataproc_cluster が実行されます。

まとめると、コードは次のようになります。

# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# [START composer_hadoop_tutorial]
"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
# Arguments to pass to Cloud Dataproc job.
input_file = 'gs://pub/shakespeare/rose.txt'

wordcount_args = ['wordcount', input_file, output_file]

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

# [START composer_hadoop_schedule]
with models.DAG(
        'composer_hadoop_tutorial',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:
    # [END composer_hadoop_schedule]

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/code.html#default-variables
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

    # [START composer_hadoop_steps]
    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
    # [END composer_hadoop_steps]

# [END composer_hadoop]

5. Airflow ファイルを Cloud Storage にアップロードする

DAG を /dags フォルダにコピーする

  1. まず、Cloud Shell を開きます。Cloud SDK が便利にインストールされています。
  2. Python サンプル リポジトリのクローンを作成し、composer/workflows ディレクトリに移動します。
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git && cd python-docs-samples/composer/workflows
  1. 次のコマンドを実行して、DAG フォルダの名前を環境変数に設定します。
DAGS_FOLDER=$(gcloud composer environments describe ${COMPOSER_INSTANCE} \
--location us-central1 --format="value(config.dagGcsPrefix)")
  1. 次の gsutil コマンドを実行して、/dags フォルダの作成場所にチュートリアル コードをコピーします
gsutil cp hadoop_tutorial.py $DAGS_FOLDER

出力は次のようになります。

Copying file://hadoop_tutorial.py [Content-Type=text/x-python]...
/ [1 files][  4.1 KiB/  4.1 KiB]
Operation completed over 1 objects/4.1 KiB.

6. Airflow UI を使用する

GCP コンソールを使用して Airflow ウェブ インターフェースにアクセスするには:

  1. [環境] ページを開きます。
  2. 環境の [Airflow ウェブサーバー] 列で、新しいウィンドウ アイコンをクリックします。新しいウィンドウに Airflow ウェブ UI が表示されます。

Airflow UI について詳しくは、ウェブ インターフェースへのアクセスをご覧ください。

変数を表示する

先ほど設定した変数は、環境内に保持されています。Airflow UI のメニューバーで [管理 > 変数] を選択すると、変数を確認できます。

[List] タブが選択され、次のキーと値を含むテーブルが表示されます。key: gcp_project, value: project-id key: gcs_bucket, value: gs://bucket-name key: gce_zone, value: zone

DAG の実行状況を確認する

DAG ファイルを Cloud Storage の dags フォルダにアップロードすると、ファイルが Cloud Composer によって解析されます。エラーが検出されなかった場合、そのワークフローの名前が DAG のリストに表示され、すぐに実行されるようキューに登録されます。DAG を表示するには、ページの上部にある [DAG] をクリックします。

84a29c71f20bff98.png

composer_hadoop_tutorial をクリックして DAG の詳細ページを開きます。このページでは、ワークフローのタスクと依存関係が図で示されます。

f4f1663c7a37f47c.png

ツールバーで [Graph View] をクリックし、各タスクのグラフィックにカーソルを合わせてステータスを表示します。各タスクを囲む線の色もステータスを表しています(緑は実行中、赤は失敗など)。

4c5a0c6fa9f88513.png

グラフビューからワークフローをもう一度実行するには、次の手順を行います。

  1. Airflow UI のグラフビューで、create_dataproc_cluster グラフィックをクリックします。
  2. [Clear] をクリックして 3 つのタスクをリセットし、[OK] をクリックして確定します。

fd1b23b462748f47.png

次の GCP Console ページに移動して、composer-hadoop-tutorial ワークフローのステータスと結果を確認することもできます。

  • Cloud Dataproc クラスタ: クラスタの作成と削除をモニタリングします。ワークフローによって作成されるクラスタは、ワークフローの間にのみ存在し、最後のワークフロー タスクの一部として削除されます。
  • Cloud Dataproc ジョブ: Apache Hadoop ワードカウント ジョブを表示またはモニタリングします。ジョブ ID をクリックすると、ジョブのログ出力を確認できます。
  • Cloud Storage ブラウザ。この Codelab 用に作成した Cloud Storage バケット内の wordcount フォルダのワードカウントの結果を表示します。

7. クリーンアップ

この Codelab で使用したリソースについて GCP アカウントに課金されないようにする手順は次のとおりです。

  1. (省略可)データを保存するには、Cloud Composer 環境の Cloud Storage バケットと、この Codelab 用に作成したストレージ バケットからデータをダウンロードします。
  2. この Codelab で作成した Cloud Storage バケットを削除します。
  3. 環境の Cloud Storage バケットを削除します
  4. Cloud Composer 環境を削除します。環境を削除しても、環境の Storage バケットは削除されません。

必要に応じて、プロジェクトを削除することもできます。

  1. GCP コンソールで [プロジェクト] ページに移動します。
  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ボックスにプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。