Google Compute Engine 上の Dataproc

1. はじめに - Google Dataproc

Dataproc は、Apache Spark、Apache Flink、Presto をはじめ、他の多くのオープンソース ツールやフレームワークを実行するための、フルマネージドでスケーラビリティの高いサービスです。Dataproc を使用すると、データレイクのモダナイゼーション、ETL / ELT、安全なデータ サイエンスを世界規模で実現できます。Dataproc は、 BigQueryCloud StorageVertex AIDataplex など、複数の Google Cloud サービスとも完全に統合されています。

Dataproc は次の 3 つの形態で利用できます。

  • Dataproc Serverless を使用すると、インフラストラクチャと自動スケーリングを構成しなくても PySpark ジョブを実行できます。Dataproc Serverless は、PySpark バッチ ワークロードとセッション / ノートブックをサポートしています。
  • Google Compute Engine 上の Dataproc を使用すると、Flink や Presto などのオープンソース ツールに加えて、YARN ベースの Spark ワークロード用の Hadoop YARN クラスタを管理できます。自動スケーリングなど、必要なだけ垂直方向または水平方向にスケーリングして、クラウドベースのクラスタを調整できます。
  • Google Kubernetes Engine 上の Dataproc を使用すると、GKE インフラストラクチャで Dataproc 仮想クラスタを構成して、Spark、PySpark、SparkR、Spark SQL ジョブを送信できます。

2. Google Cloud VPC に Dataproc クラスタを作成する

このステップでは、Google Cloud コンソールを使用して Google Cloud に Dataproc クラスタを作成します。

まず、コンソールで Dataproc サービス API を有効にします。有効にしたら、検索バーで「Dataproc」を検索し、[クラスタを作成] をクリックします。

[Compute Engine 上のクラスタ] を選択して、Google Compute Engine(GCE)VM を基盤インフラストラクチャとして使用し、Dataproc クラスタを実行します。

a961b2e8895e88da.jpeg

これで [クラスタの作成] ページが開きます。

9583c91204a09c12.jpeg

このページで、次の操作を行います。

  • クラスタの一意の名前を指定します。
  • 特定のリージョンを選択します。ゾーンを選択することもできますが、Dataproc では自動的に選択することもできます。この Codelab では、「us-central1」と「us-central1-c」を選択します。
  • [標準] クラスタタイプを選択します。これにより、マスターノードが 1 つになります。
  • [ノードを構成] タブで、作成されるワーカーの数が 2 つであることを確認します。
  • [クラスタをカスタマイズ] セクションで、[コンポーネント ゲートウェイを有効にする] の横にあるチェックボックスをオンにします。これにより、Spark UI、Yarn Node Manager、Jupyter ノートブックなど、クラスタ上のウェブ インターフェースにアクセスできるようになります。
  • [オプション コンポーネント] で [Jupyter Notebook] を選択します。これにより、Jupyter ノートブック サーバーでクラスタが構成されます。
  • 他はすべてそのままにして、[クラスタを作成] をクリックします。

これにより、Dataproc クラスタがスピンアップされます。

3. クラスタを起動して SSH で接続する

クラスタのステータスが [実行中] に変わったら、Dataproc コンソールでクラスタ名をクリックします。

7332f1c2cb25807d.jpeg

[VM インスタンス] タブをクリックして、クラスタのマスターノードと 2 つのワーカーノードを表示します。

25be1578e00f669f.jpeg

マスターノードの横にある [SSH] をクリックして、マスターノードにログインします。

2810ffd97f315bdb.jpeg

hdfs コマンドを実行して、ディレクトリ構造を確認します。

hadoop_commands_example

sudo hadoop fs -ls /
sudo hadoop version
sudo hadoop fs -mkdir /test51 
sudo hadoop fs -ls /

4. ウェブ インターフェースとコンポーネント ゲートウェイ

[Dataproc cluster console] でクラスタ名をクリックし、[ウェブ インターフェース] タブをクリックします。

6398f71d6293d6ff.jpeg

使用可能なウェブ インターフェース(Jupyter など)が表示されます。[Jupyter] をクリックして Jupyter ノートブックを開きます。これを使用して、GCS に保存されている PySpark でノートブックを作成できます。ノートブックを Google Cloud Storage に保存し、この Codelab で使用する PySpark ノートブックを開きます。

5. Spark ジョブをモニタリングして確認する

Dataproc クラスタが起動して実行されたら、PySpark バッチジョブを作成し、そのジョブを Dataproc クラスタに送信します。

PySpark スクリプトを保存する Google Cloud Storage(GCS)バケットを作成します。バケットは、Dataproc クラスタと同じリージョンに作成してください。

679fd2f76806f4e2.jpeg

GCS バケットが作成されたら、次のファイルをこのバケットにコピーします。

https://raw.githubusercontent.com/diptimanr/spark-on-gce/main/test-spark-1.py

このスクリプトは、サンプル Spark DataFrame を作成し、Hive テーブルとして書き込みます。

hive_job.py

from pyspark.sql import SparkSession
from datetime import datetime, date
from pyspark.sql import Row

spark = SparkSession.builder.master("local").enableHiveSupport().getOrCreate()

df = spark.createDataFrame([ (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
        (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)), (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
    ], schema='a long, b double, c string, d date, e timestamp')
print("..... Writing data .....")
df.write.mode("overwrite").saveAsTable("test_table_1")
print("..... Complete .....")

このスクリプトを Dataproc で Spark バッチジョブとして送信します。左側のナビゲーション メニューで [ジョブ] をクリックし、[ジョブを送信] をクリックします。

5767fc7c50b706d3.jpeg

[**ジョブ ID**] と [**リージョン**] を指定します。クラスタを選択し、コピーした Spark スクリプトの GCS ロケーションを指定します。このジョブは、Dataproc で Spark バッチジョブとして実行されます。

[プロパティ] で、キー spark.submit.deployMode と値 client を追加して、ドライバがワーカーノードではなく Dataproc マスターノードで実行されるようにします。[送信] をクリックして、バッチジョブを Dataproc に送信します。

a7ca90f5132faa31.jpeg

Spark スクリプトは Dataframe を作成し、Hive テーブル test_table_1 に書き込みます。

ジョブが正常に実行されると、[モニタリング] タブにコンソールの出力ステートメントが表示されます。

bdec2f3ae1055f9.jpeg

Hive テーブルが作成されたら、別の Hive クエリジョブを送信して、テーブルの内容を選択し、コンソールに表示します。

次のプロパティを使用して別のジョブを作成します。

c16f02d1b3afaa27.jpeg

[ジョブタイプ] が [Hive] に設定され、クエリソースタイプが [クエリテキスト] に設定されていることに注意してください。これは、[クエリテキスト] テキスト ボックスに HiveQL ステートメント全体を記述することを意味します。

残りのパラメータはデフォルトのままにして、ジョブを送信します。

e242e50bc2519bf4.jpeg

HiveQL がすべてのレコードを選択してコンソールに表示されることを確認します。

6. 自動スケーリング

自動スケーリングは、ワークロードに「適切」な数のクラスタ ワーカーノードを推定するタスクです。

Dataproc AutoscalingPolicies API を使用すると、クラスタ リソースの管理を自動化するメカニズムが提供され、クラスタ ワーカー VM の自動スケーリングが可能になります。自動スケーリング ポリシーは、そのポリシーを使用するクラスタ ワーカーのスケーリング方法を指定するための、再利用可能な構成です。スケーリングの境界や頻度、積極性を定義し、クラスタ存続期間中のクラスタ リソースをきめ細かく制御します。

Dataproc 自動スケーリング ポリシーは YAML ファイルを使用して記述されます。これらの YAML ファイルは、クラスタを作成するための CLI コマンドで渡されるか、Cloud Console からクラスタを作成するときに GCS バケットから選択されます。

Dataproc 自動スケーリング ポリシーの例を次に示します。

policy.yaml

workerConfig:
  minInstances: 10
  maxInstances: 10
secondaryWorkerConfig:
  maxInstances: 50
basicAlgorithm:
  cooldownPeriod: 4m
  yarnConfig:
    scaleUpFactor: 0.05
    scaleDownFactor: 1.0
    gracefulDecommissionTimeout: 1h

7. Dataproc オプション コンポーネントを構成する

これにより、Dataproc クラスタがスピンアップされます。

Dataproc クラスタを作成すると、標準の Apache Hadoop エコシステム コンポーネントが自動的にクラスタにインストールされます(Dataproc バージョン リストをご覧ください)。クラスタを作成する際に、[オプション コンポーネント] と呼ばれる追加コンポーネントをクラスタにインストールすることもできます。

e39cc34245af3f01.jpeg

コンソールから Dataproc クラスタを作成する際に、オプション コンポーネントを有効にし、オプション コンポーネントとして [Jupyter Notebook] を選択しました。

8. リソースのクリーンアップ

クラスタをクリーンアップするには、Dataproc コンソールでクラスタを選択し、[停止] をクリックします。クラスタが停止したら、[削除] をクリックしてクラスタを削除します。

Dataproc クラスタを削除したら、コードがコピーされた GCS バケットを削除します。

リソースをクリーンアップして不要な課金を停止するには、まず Dataproc クラスタを停止してから削除する必要があります。

クラスタを停止して削除する前に、HDFS ストレージに書き込まれたすべてのデータが、永続ストレージ用に GCS にコピーされていることを確認してください。

クラスタを停止するには、[停止] をクリックします。

52065de928ab52e7.jpeg

クラスタが停止したら、[削除] をクリックしてクラスタを削除します。

確認ダイアログで [削除] をクリックして、クラスタを削除します。

52065de928ab52e7.jpeg