Menjalankan tugas jumlah kata Hadoop di cluster Dataproc

1. Pengantar

Alur kerja adalah kasus penggunaan umum dalam analitik data - alur kerja melibatkan penyerapan, transformasi, dan analisis data untuk menemukan informasi yang bermakna di dalamnya. Di Google Cloud Platform, alat untuk mengorkestrasi alur kerja adalah Cloud Composer, yang merupakan versi terhosting dari alat alur kerja open source Apache Airflow yang populer. Di lab ini, Anda akan menggunakan Cloud Composer untuk membuat alur kerja sederhana yang membuat cluster Cloud Dataproc, menganalisisnya menggunakan Cloud Dataproc dan Apache Hadoop, lalu menghapus cluster Cloud Dataproc setelahnya.

Apa itu Cloud Composer?

Cloud Composer adalah layanan orkestrasi alur kerja yang terkelola sepenuhnya, yang memungkinkan Anda membuat, menjadwalkan, dan memantau pipeline yang tersebar di cloud dan pusat data lokal. Dikembangkan dari project open source Apache Airflow yang populer dan dioperasikan menggunakan bahasa pemrograman Python, Cloud Composer bebas dari lock-in dan mudah digunakan.

Dengan menggunakan Cloud Composer, bukan instance lokal Apache Airflow, pengguna dapat memanfaatkan Airflow terbaik tanpa perlu overhead penginstalan atau pengelolaan.

Apa itu Apache Airflow?

Apache Airflow adalah alat open source yang digunakan untuk membuat, menjadwalkan, dan memantau alur kerja secara terprogram. Ada beberapa istilah kunci yang perlu diingat terkait Airflow yang akan Anda lihat di sepanjang lab:

  • DAG - DAG (Directed Acyclic Graph) adalah kumpulan tugas terorganisir yang ingin Anda jadwalkan dan jalankan. DAG, yang juga disebut alur kerja, ditentukan dalam file Python standar
  • Operator - operator menjelaskan satu tugas dalam alur kerja

Apa itu Cloud Dataproc?

Cloud Dataproc adalah layanan Apache Spark dan Apache Hadoop yang terkelola sepenuhnya dari Google Cloud Platform. Cloud Dataproc mudah terintegrasi dengan layanan GCP lainnya, sehingga Anda mendapatkan platform canggih dan lengkap untuk pemrosesan data, analisis, dan machine learning.

Yang akan Anda lakukan

Codelab ini menampilkan cara membuat dan menjalankan alur kerja Apache Airflow di Cloud Composer yang menyelesaikan tugas berikut:

  • Membuat cluster Cloud Dataproc
  • Menjalankan tugas jumlah kata Apache Hadoop di cluster, dan mengeluarkan hasilnya ke Cloud Storage
  • Menghapus cluster

Yang akan Anda pelajari

  • Cara membuat dan menjalankan alur kerja Apache Airflow di Cloud Composer
  • Cara menggunakan Cloud Composer dan Cloud Dataproc untuk menjalankan analisis pada set data
  • Cara mengakses lingkungan Cloud Composer melalui Google Cloud Platform Console, Cloud SDK, dan antarmuka web Airflow

Yang Anda butuhkan

  • Akun GCP
  • Pengetahuan CLI dasar
  • Pemahaman dasar tentang Python

2. Menyiapkan GCP

Membuat Project

Pilih atau buat Project Google Cloud Platform.

Catat Project ID Anda, yang akan digunakan pada langkah-langkah berikutnya.

Jika Anda membuat project baru, project ID dapat ditemukan tepat di bawah Nama Project pada halaman pembuatan

Jika sudah membuat project, Anda dapat menemukan ID di halaman beranda konsol pada kartu Info Project

Aktifkan API

Aktifkan Cloud Composer, Cloud Dataproc, dan Cloud Storage API. Setelah diaktifkan, Anda dapat mengabaikan tombol yang bertuliskan "Buka Kredensial" dan melanjutkan ke langkah berikutnya dalam tutorial.

Membuat Lingkungan Composer

Buat lingkungan Cloud Composer dengan konfigurasi berikut:

  • Nama: my-composer-environment
  • Lokasi: us-central1
  • Zona: us-central1-a

Semua konfigurasi lainnya dapat tetap pada setelan default-nya. Klik "Buat" di bagian bawah.

Membuat Bucket Cloud Storage

Di project Anda, buat bucket Cloud Storage dengan konfigurasi berikut:

  • Nama: <your-project-id>
  • Kelas penyimpanan default: Multi-regional
  • Lokasi: Amerika Serikat
  • Model Kontrol Akses: terperinci

Tekan "Buat" jika Anda sudah siap

3. Menyiapkan Apache Airflow

Melihat Informasi Lingkungan Composer

Di GCP Console, buka halaman Lingkungan

Klik nama lingkungan untuk melihat detailnya.

Halaman Environment details menyediakan informasi, seperti URL antarmuka web Airflow, ID cluster Google Kubernetes Engine, nama bucket Cloud Storage, dan jalur untuk folder /dags.

Di Airflow, DAG (Directed Acyclic Graph) adalah kumpulan tugas terorganisir yang ingin Anda jadwalkan dan jalankan. DAG, juga disebut alur kerja, ditentukan dalam file Python standar. Cloud Composer hanya menjadwalkan DAG di folder /dags. Folder /dags berada di bucket Cloud Storage yang dibuat secara otomatis oleh Cloud Composer saat Anda membuat lingkungan.

Menetapkan Variabel Lingkungan Apache Airflow

Variabel Apache Airflow adalah konsep khusus Airflow yang berbeda dengan variabel lingkungan. Pada langkah ini, Anda akan menetapkan tiga variabel Airflow berikut: gcp_project, gcs_bucket, dan gce_zone.

Menggunakan gcloud untuk Menetapkan Variabel

Pertama, buka Cloud Shell, yang telah menginstal Cloud SDK dengan mudah untuk Anda.

Tetapkan variabel lingkungan COMPOSER_INSTANCE ke nama lingkungan Composer Anda

COMPOSER_INSTANCE=my-composer-environment

Untuk menetapkan variabel Airflow menggunakan alat command line gcloud, gunakan perintah gcloud composer environments run dengan sub-perintah variables. Perintah gcloud composer ini mengeksekusi sub-perintah Airflow CLI variables. Sub-perintah meneruskan argumen ke alat command line gcloud.

Anda akan menjalankan perintah ini tiga kali, dengan mengganti variabel dengan variabel yang relevan dengan project Anda.

Tetapkan gcp_project menggunakan perintah berikut, dengan mengganti <your-project-id> dengan project ID yang Anda catat di Langkah 2.

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gcp_project <your-project-id>

Output Anda akan terlihat seperti ini

kubeconfig entry generated for us-central1-my-composer-env-123abc-gke.
Executing within the following Kubernetes cluster namespace: composer-1-10-0-airflow-1-10-2-123abc
[2020-04-17 20:42:49,713] {settings.py:176} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=449
[2020-04-17 20:42:50,123] {default_celery.py:90} WARNING - You have configured a result_backend of redis://airflow-redis-service.default.svc.cluste
r.local:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database).
[2020-04-17 20:42:50,127] {__init__.py:51} INFO - Using executor CeleryExecutor
[2020-04-17 20:42:50,433] {app.py:52} WARNING - Using default Composer Environment Variables. Overrides have not been applied.
[2020-04-17 20:42:50,440] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg
[2020-04-17 20:42:50,452] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg

Tetapkan gcs_bucket menggunakan perintah berikut, dengan mengganti <your-bucket-name> dengan ID bucket yang Anda catat di Langkah 2. Jika mengikuti rekomendasi kami, nama bucket Anda sama dengan project ID Anda. Output Anda akan serupa dengan perintah sebelumnya.

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gcs_bucket gs://<your-bucket-name>

Tetapkan gce_zone menggunakan perintah berikut. Output Anda akan mirip dengan perintah sebelumnya.

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gce_zone us-central1-a

(Opsional) Menggunakan gcloud untuk melihat variabel

Untuk melihat nilai variabel, jalankan sub-perintah Airflow CLI variables dengan argumen get atau gunakan UI Airflow.

Contoh:

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --get gcs_bucket

Anda dapat melakukannya dengan salah satu dari tiga variabel yang baru saja Anda tetapkan: gcp_project, gcs_bucket, dan gce_zone.

4. Contoh Alur Kerja

Mari kita lihat kode untuk DAG yang akan digunakan di langkah 5. Jangan khawatir tentang mengunduh file, cukup ikuti di sini.

Ada banyak hal yang perlu dijelaskan di sini, jadi mari kita uraikan sedikit.

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

Kita mulai dengan beberapa impor Airflow:

  • airflow.models - Memungkinkan kami mengakses dan membuat data di database Airflow.
  • airflow.contrib.operators - Tempat operator dari komunitas tinggal. Dalam hal ini, kita memerlukan dataproc_operator untuk mengakses Cloud Dataproc API.
  • airflow.utils.trigger_rule - Untuk menambahkan aturan pemicu ke operator. Aturan pemicu memungkinkan kontrol mendetail atas apakah operator harus melakukan eksekusi terkait dengan status induknya.
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep

Ini menentukan lokasi file output kita. Baris yang penting di sini adalah models.Variable.get('gcs_bucket') yang akan mengambil nilai variabel gcs_bucket dari database Airflow.

WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)

input_file = 'gs://pub/shakespeare/rose.txt'

wordcount_args = ['wordcount', input_file, output_file]
  • WORDCOUNT_JAR - Lokasi file .jar yang pada akhirnya akan kita jalankan di cluster Cloud Dataproc. Aplikasi ini sudah dihosting di GCP untuk Anda.
  • input_file - Lokasi file yang berisi data yang akan dihitung oleh tugas Hadoop kita. Kita akan mengupload data ke lokasi tersebut bersama-sama di Langkah 5.
  • wordcount_args - Argumen yang akan kita teruskan ke file jar.
yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

Tindakan ini akan memberi kita objek datetime yang setara yang mewakili tengah malam pada hari sebelumnya. Misalnya, jika ini dieksekusi pada pukul 11.00 pada 4 Maret, objek datetime akan mewakili 00:00 pada 3 Maret. Hal ini berkaitan dengan cara Airflow menangani penjadwalan. Info selengkapnya dapat ditemukan di sini.

default_dag_args = {
    'start_date': yesterday,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

Variabel default_dag_args dalam bentuk kamus harus diberikan setiap kali DAG baru dibuat:

  • 'email_on_failure' - Menunjukkan apakah peringatan email harus dikirimkan saat tugas gagal
  • 'email_on_retry' - Menunjukkan apakah peringatan email harus dikirim saat tugas dicoba lagi
  • 'retries' - Menunjukkan jumlah percobaan ulang yang harus dilakukan Airflow jika terjadi kegagalan DAG
  • 'retry_delay' - Menunjukkan berapa lama Airflow harus menunggu sebelum mencoba lagi
  • 'project_id' - Memberi tahu DAG tentang Project ID GCP yang akan dikaitkan, yang nantinya akan diperlukan dengan Operator Dataproc
with models.DAG(
        'composer_hadoop_tutorial',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

Penggunaan with models.DAG akan memberi tahu skrip untuk menyertakan semua yang ada di bawahnya di dalam DAG yang sama. Kita juga melihat tiga argumen yang diteruskan dalam:

  • Yang pertama, string, adalah nama untuk memberikan DAG yang kita buat. Dalam hal ini, kita menggunakan composer_hadoop_tutorial.
  • schedule_interval - Objek datetime.timedelta, yang di sini telah kita tetapkan ke satu hari. Artinya, DAG ini akan mencoba dieksekusi sekali sehari setelah 'start_date' yang ditetapkan sebelumnya di 'default_dag_args'
  • default_args - Kamus yang kita buat sebelumnya yang berisi argumen default untuk DAG

Membuat Cluster Dataproc

Selanjutnya, kita akan membuat dataproc_operator.DataprocClusterCreateOperator yang membuat Cluster Cloud Dataproc.

    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

Dalam operator ini, kita melihat beberapa argumen, kecuali yang pertama khusus untuk operator ini:

  • task_id - Sama seperti di BashOperator, ini adalah nama yang kita tetapkan ke operator, yang dapat dilihat dari UI Airflow
  • cluster_name - Nama yang kami tetapkan untuk cluster Cloud Dataproc. Di sini, kami menamainya composer-hadoop-tutorial-cluster-{{ ds_nodash }} (lihat kotak info untuk informasi tambahan opsional)
  • num_workers - Jumlah worker yang kami alokasikan ke cluster Cloud Dataproc
  • zone - Wilayah geografis tempat kita ingin cluster berada, seperti yang disimpan dalam database Airflow. Tindakan ini akan membaca variabel 'gce_zone' yang kita tetapkan di Langkah 3
  • master_machine_type - Jenis mesin yang ingin dialokasikan ke master Cloud Dataproc
  • worker_machine_type - Jenis mesin yang ingin dialokasikan ke pekerja Cloud Dataproc

Mengirim Tugas Apache Hadoop

dataproc_operator.DataProcHadoopOperator memungkinkan kita mengirimkan tugas ke cluster Cloud Dataproc.

    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

Kami menyediakan beberapa parameter:

  • task_id - Nama yang kami tetapkan ke bagian DAG ini
  • main_jar - Lokasi file .jar yang ingin kita jalankan terhadap cluster
  • cluster_name - Nama cluster yang akan menjalankan tugas, yang akan Anda lihat sama dengan yang kita temukan di operator sebelumnya
  • arguments - Argumen yang diteruskan ke file jar, seperti yang Anda lakukan jika menjalankan file .jar dari command line

Menghapus Cluster

Operator terakhir yang akan kita buat adalah dataproc_operator.DataprocClusterDeleteOperator.

    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

Seperti namanya, operator ini akan menghapus cluster Cloud Dataproc yang ditentukan. Kita melihat tiga argumen di sini:

  • task_id - Sama seperti di BashOperator, ini adalah nama yang kita tetapkan ke operator, yang dapat dilihat dari UI Airflow
  • cluster_name - Nama yang kami berikan untuk cluster Cloud Dataproc. Di sini kami menamainya composer-hadoop-tutorial-cluster-{{ ds_nodash }} (lihat kotak info setelah "Membuat Cluster Dataproc" untuk informasi tambahan opsional)
  • trigger_rule - Kita telah menyebutkan Aturan Pemicu secara singkat selama impor pada awal langkah ini, tetapi di sini kita telah menerapkannya. Secara default, operator Airflow tidak dijalankan kecuali jika semua operator upstreamnya telah berhasil diselesaikan. Aturan pemicu ALL_DONE hanya mewajibkan semua operator upstream telah selesai, terlepas dari apakah operator tersebut berhasil atau tidak. Hal ini berarti bahwa meskipun tugas Hadoop gagal, kita masih ingin menghancurkan cluster.
  create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Terakhir, kita ingin operator ini dieksekusi dalam urutan tertentu, dan kita dapat menunjukkannya dengan menggunakan operator bitshift Python. Dalam hal ini, create_dataproc_cluster akan selalu dieksekusi terlebih dahulu, diikuti oleh run_dataproc_hadoop dan terakhir delete_dataproc_cluster.

Dengan menggabungkan semuanya, kodenya akan terlihat seperti ini:

# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# [START composer_hadoop_tutorial]
"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
# Arguments to pass to Cloud Dataproc job.
input_file = 'gs://pub/shakespeare/rose.txt'

wordcount_args = ['wordcount', input_file, output_file]

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

# [START composer_hadoop_schedule]
with models.DAG(
        'composer_hadoop_tutorial',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:
    # [END composer_hadoop_schedule]

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/code.html#default-variables
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

    # [START composer_hadoop_steps]
    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
    # [END composer_hadoop_steps]

# [END composer_hadoop]

5. Mengupload File Airflow ke Cloud Storage

Menyalin DAG ke Folder /dags

  1. Pertama, buka Cloud Shell, dengan Cloud SDK yang sudah diinstal dengan mudah untuk Anda.
  2. Buat clone untuk repositori sampel python, lalu ubah ke direktori composer/workflows
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git && cd python-docs-samples/composer/workflows
  1. Jalankan perintah berikut untuk menetapkan nama folder DAG ke variabel lingkungan
DAGS_FOLDER=$(gcloud composer environments describe ${COMPOSER_INSTANCE} \
--location us-central1 --format="value(config.dagGcsPrefix)")
  1. Jalankan perintah gsutil berikut untuk menyalin kode tutorial ke tempat folder /dags Anda dibuat
gsutil cp hadoop_tutorial.py $DAGS_FOLDER

Output Anda akan tampak seperti berikut ini:

Copying file://hadoop_tutorial.py [Content-Type=text/x-python]...
/ [1 files][  4.1 KiB/  4.1 KiB]
Operation completed over 1 objects/4.1 KiB.

6. Menggunakan UI Airflow

Untuk mengakses antarmuka web Airflow menggunakan GCP Console:

  1. Buka halaman Environments.
  2. Pada kolom Airflow webserver untuk lingkungan, klik ikon jendela baru. UI web Airflow terbuka di jendela browser baru.

Untuk mengetahui informasi tentang UI Airflow, lihat Mengakses antarmuka web.

Lihat Variabel

Variabel yang Anda tetapkan sebelumnya akan tetap ada di lingkungan Anda. Anda dapat melihat variabel dengan memilih Admin > Variabel dari panel menu UI Airflow.

Tab daftar dipilih dan menampilkan tabel dengan kunci dan nilai berikut kunci: gcp_project, value: project-id key: gcs_bucket, value: gs://bucket-name key: gce_zone, value: zone

Menjelajahi Operasi DAG

Saat Anda mengupload file DAG ke folder dags di Cloud Storage, Cloud Composer akan menguraikan file tersebut. Jika tidak ditemukan error, nama alur kerja akan muncul di daftar DAG, dan alur kerja dimasukkan dalam antrean untuk segera dijalankan. Untuk melihat DAG, klik DAG di bagian atas halaman.

84a29c71f20bff98.png

Klik composer_hadoop_tutorial untuk membuka halaman detail DAG. Halaman ini menyertakan representasi grafis dari tugas dan dependensi alur kerja.

f4f1663c7a37f47c.png

Sekarang, di toolbar, klik Tampilan Grafik, lalu arahkan mouse ke gambar setiap tugas untuk melihat statusnya. Perhatikan bahwa batas di sekitar setiap tugas juga menunjukkan statusnya (hijau = running; merah = failed, etc.).

4c5a0c6fa9f88513.png

Untuk menjalankan alur kerja lagi dari Tampilan Grafik:

  1. Di Tampilan Grafik UI Airflow, klik grafis create_dataproc_cluster.
  2. Klik Clear untuk mereset ketiga tugas, lalu klik OK untuk mengonfirmasi.

fd1b23b462748f47.png

Anda juga dapat memeriksa status dan hasil alur kerja composer-hadoop-tutorial dengan membuka halaman GCP Console berikut:

  • Cluster Cloud Dataproc untuk memantau pembuatan dan penghapusan cluster. Perlu diperhatikan bahwa cluster yang dibuat oleh alur kerja bersifat sementara: cluster hanya ada selama durasi alur kerja dan dihapus sebagai bagian dari tugas alur kerja terakhir.
  • Cloud Dataproc Jobs untuk melihat atau memantau tugas jumlah kata Apache Hadoop. Klik ID Pekerjaan untuk melihat output log tugas.
  • Cloud Storage Browser untuk melihat hasil jumlah kata di folder wordcount di bucket Cloud Storage yang Anda buat untuk codelab ini.

7. Pembersihan

Agar tidak menimbulkan biaya pada akun GCP Anda untuk resource yang digunakan dalam codelab ini:

  1. (Opsional) Untuk menyimpan data, download data dari bucket Cloud Storage untuk lingkungan Cloud Composer dan bucket penyimpanan yang Anda buat untuk codelab ini.
  2. Hapus bucket Cloud Storage yang Anda buat untuk codelab ini.
  3. Hapus bucket Cloud Storage untuk lingkungan.
  4. Hapus lingkungan Cloud Composer. Perhatikan bahwa menghapus lingkungan tidak akan menghapus bucket penyimpanan untuk lingkungan tersebut.

Anda juga dapat menghapus project secara opsional:

  1. Di GCP Console, buka halaman Project.
  2. Dalam daftar project, pilih project yang ingin dihapus, lalu klik Hapus.
  3. Di kotak, ketik project ID, lalu klik Shut down untuk menghapus project.