1. Einführung
Workflows sind ein häufiger Anwendungsfall in der Data Analytics – sie umfassen die Aufnahme, Transformation und Analyse von Daten, um die darin enthaltenen Informationen zu finden. Auf der Google Cloud Platform werden Workflows in Cloud Composer ausgeführt, einer gehosteten Version des beliebten Open-Source-Workflowtools Apache Airflow. In diesem Lab erstellen Sie mit Cloud Composer einen einfachen Workflow, der einen Cloud Dataproc-Cluster erstellt, mit Cloud Dataproc und Apache Hadoop analysiert und anschließend den Cloud Dataproc-Cluster löscht.
Was ist Cloud Composer?
Cloud Composer ist ein vollständig verwalteter Workflow-Orchestrierungsdienst, mit dem Sie Pipelines, die sich über Clouds und lokale Rechenzentren erstrecken, erstellen, planen und überwachen können. Cloud Composer basiert auf dem beliebten Open-Source-Projekt Apache Airflow und wird in der Programmiersprache Python betrieben. Cloud Composer ist einfach zu verwenden und anbieterunabhängig.
Wenn Sie Cloud Composer anstelle einer lokalen Instanz von Apache Airflow verwenden, profitieren Nutzer von den Vorteilen von Airflow ohne Installations- oder Verwaltungsaufwand.
Was ist Apache Airflow?
Apache Airflow ist ein Open-Source-Tool, mit dem Workflows programmatisch erstellt, geplant und überwacht werden können. Im Zusammenhang mit Airflow sind einige wichtige Begriffe zu beachten, die in diesem Lab immer wieder begegnet sind:
- DAG: Ein DAG (Directed Acyclic Graph) ist eine Sammlung strukturierter Aufgaben, die Sie planen und ausführen möchten. DAGs, auch Workflows genannt, werden in Standard-Python-Dateien definiert.
- Operator: Ein Operator beschreibt eine einzelne Aufgabe in einem Workflow.
Was ist Cloud Dataproc?
Cloud Dataproc ist der vollständig verwaltete Apache Spark- und Apache Hadoop -Dienst der Google Cloud Platform. Cloud Dataproc lässt sich einfach in andere GCP-Dienste einbinden und bietet Ihnen eine leistungsstarke und umfassende Plattform für Datenverarbeitung, Analysen und maschinelles Lernen.
Vorgehensweise
In diesem Codelab erfahren Sie, wie Sie in Cloud Composer einen Apache Airflow-Workflow erstellen und ausführen, der die folgenden Aufgaben erledigt:
- Erstellt einen Cloud Dataproc-Cluster
- Führt einen Apache Hadoop-Wordcount-Job im Cluster aus und gibt die Ergebnisse an Cloud Storage aus
- Löscht den Cluster
Aufgaben in diesem Lab
- Apache Airflow-Workflow in Cloud Composer erstellen und ausführen
- Cloud Composer und Cloud Dataproc zum Ausführen einer Analyse für ein Dataset verwenden
- Wie Sie über die Google Cloud Platform Console, das Cloud SDK und die Airflow-Weboberfläche auf Ihre Cloud Composer-Umgebung zugreifen
Voraussetzungen
- GCP-Konto
- Grundlegende CLI-Kenntnisse
- Grundlegendes Verständnis von Python
2. GCP einrichten
Projekt erstellen
Wählen Sie ein Google Cloud Platform-Projekt aus oder erstellen Sie eines.
Notieren Sie sich Ihre Projekt-ID. Sie benötigen sie in späteren Schritten.
Wenn Sie ein neues Projekt erstellen, finden Sie die Projekt-ID direkt unter dem Projektnamen auf der Erstellungsseite. | |
Wenn Sie bereits ein Projekt erstellt haben, finden Sie die ID auf der Console-Startseite auf der Karte „Projektinformationen“. |
APIs aktivieren
Aktivieren Sie die Cloud Composer API, die Cloud Dataproc API und die Cloud Storage API. Anschließend können Sie die Schaltfläche „Zu den Anmeldedaten“ ignorieren und mit dem nächsten Schritt der Anleitung fortfahren. |
Composer-Umgebung erstellen
Erstellen Sie eine Cloud Composer-Umgebung mit der folgenden Konfiguration:
Bei allen anderen Konfigurationen können die Standardeinstellungen beibehalten werden. Klicken Sie unten auf „Erstellen“. |
Cloud Storage-Bucket erstellen
Erstellen Sie in Ihrem Projekt einen Cloud Storage-Bucket mit der folgenden Konfiguration:
Klicken Sie auf „Erstellen“, wenn Sie fertig sind. |
3. Apache Airflow einrichten
Informationen zur Composer-Umgebung aufrufen
Öffnen Sie in der GCP Console die Seite Umgebungen.
Klicken Sie auf den Namen der Umgebung, um die Details aufzurufen.
Die Seite Umgebungsdetails enthält verschiedene Informationen, wie die URL der Airflow-Benutzeroberfläche, die Cluster-ID der Google Kubernetes Engine, den Namen des Cloud Storage-Buckets und den Pfad zum Ordner „/dags“.
In Airflow ist ein DAG (Directed Acyclic Graph) eine Sammlung strukturierter Aufgaben, die Sie planen und ausführen möchten. DAGs, auch Workflows genannt, werden in Standard-Python-Dateien definiert. Cloud Composer plant nur die DAGs im Ordner „/dags“. Der Ordner „/dags“ befindet sich im Cloud Storage-Bucket, den Cloud Composer beim Erstellen der Umgebung automatisch erstellt.
Apache Airflow-Umgebungsvariablen festlegen
Apache Airflow-Variablen sind ein Airflow-spezifisches Konzept und unterscheiden sich von Umgebungsvariablen. In diesem Schritt legen Sie die folgenden drei Airflow-Variablen fest: gcp_project
, gcs_bucket
und gce_zone
.
gcloud
zum Festlegen von Variablen
Öffnen Sie zuerst Cloud Shell. Dort ist das Cloud SDK komfortabel installiert.
Legen Sie die Umgebungsvariable COMPOSER_INSTANCE
auf den Namen Ihrer Composer-Umgebung fest.
COMPOSER_INSTANCE=my-composer-environment
Um Airflow-Variablen mit dem gcloud-Befehlszeilentool festzulegen, verwenden Sie den Befehl gcloud composer environments run
mit dem Unterbefehl variables
. Mit diesem gcloud composer
-Befehl wird der Unterbefehl variables
der Airflow-Befehlszeile ausgeführt. Dabei werden die Argumente an das gcloud
-Befehlszeilentool übergeben.
Sie führen diesen Befehl dreimal aus und ersetzen die Variablen durch die für Ihr Projekt relevanten Variablen.
Legen Sie gcp_project
mit dem folgenden Befehl fest und ersetzen Sie dabei <your-project-id> durch die Projekt-ID, die Sie sich in Schritt 2 notiert haben.
gcloud composer environments run ${COMPOSER_INSTANCE} \ --location us-central1 variables -- --set gcp_project <your-project-id>
Ihre Ausgabe sieht in etwa so aus:
kubeconfig entry generated for us-central1-my-composer-env-123abc-gke. Executing within the following Kubernetes cluster namespace: composer-1-10-0-airflow-1-10-2-123abc [2020-04-17 20:42:49,713] {settings.py:176} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=449 [2020-04-17 20:42:50,123] {default_celery.py:90} WARNING - You have configured a result_backend of redis://airflow-redis-service.default.svc.cluste r.local:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database). [2020-04-17 20:42:50,127] {__init__.py:51} INFO - Using executor CeleryExecutor [2020-04-17 20:42:50,433] {app.py:52} WARNING - Using default Composer Environment Variables. Overrides have not been applied. [2020-04-17 20:42:50,440] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg [2020-04-17 20:42:50,452] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg
Legen Sie gcs_bucket
mit dem folgenden Befehl fest und ersetzen Sie <your-bucket-name>
durch die Bucket-ID, die Sie sich in Schritt 2 notiert haben. Wenn Sie unserer Empfehlung gefolgt sind, entspricht der Bucket-Name Ihrer Projekt-ID. Die Ausgabe sollte in etwa so aussehen wie beim vorherigen Befehl.
gcloud composer environments run ${COMPOSER_INSTANCE} \ --location us-central1 variables -- --set gcs_bucket gs://<your-bucket-name>
Legen Sie den gce_zone
mit dem folgenden Befehl fest. Die Ausgabe sieht in etwa den vorherigen Befehlen aus.
gcloud composer environments run ${COMPOSER_INSTANCE} \ --location us-central1 variables -- --set gce_zone us-central1-a
Optional: gcloud
verwenden, um eine Variable aufzurufen
Führen Sie den Airflow-CLI-Unterbefehl variables
mit dem Argument get
aus oder verwenden Sie die Airflow-UI, um den Wert einer Variablen abzurufen.
Beispiel:
gcloud composer environments run ${COMPOSER_INSTANCE} \ --location us-central1 variables -- --get gcs_bucket
Dazu können Sie jede der drei Variablen verwenden, die Sie gerade festgelegt haben: gcp_project
, gcs_bucket
und gce_zone
.
4. Beispiel-Workflow
Sehen wir uns den Code für den DAG an, den wir in Schritt 5 verwenden werden. Du brauchst dir noch keine Gedanken über das Herunterladen von Dateien zu machen. Folge einfach den Schritten hier.
Hier gibt es so viel zu packen, schauen wir uns das etwas genauer an.
from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule
Wir beginnen mit einigen Airflow-Importen:
airflow.models
: Damit können wir auf Daten in der Airflow-Datenbank zugreifen und Daten darin erstellen.airflow.contrib.operators
: Hier befinden sich Betreiber aus der Community. In diesem Fall benötigen wir dasdataproc_operator
, um auf die Cloud Dataproc API zuzugreifen.airflow.utils.trigger_rule
: Zum Hinzufügen von Triggerregeln zu unseren Operatoren. Mit Triggerregeln können Sie genau steuern, ob ein Operator im Verhältnis zum Status seines übergeordneten Elements ausgeführt werden soll.
output_file = os.path.join(
models.Variable.get('gcs_bucket'), 'wordcount',
datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
Gibt den Speicherort unserer Ausgabedatei an. Die wichtige Zeile hier ist models.Variable.get('gcs_bucket')
. Mit dieser Zeile wird der Wert der Variablen gcs_bucket
aus der Airflow-Datenbank abgerufen.
WORDCOUNT_JAR = (
'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
input_file = 'gs://pub/shakespeare/rose.txt'
wordcount_args = ['wordcount', input_file, output_file]
WORDCOUNT_JAR
: Speicherort der JAR-Datei, die später im Cloud Dataproc-Cluster ausgeführt wird. Es wird bereits für Sie in der Google Cloud gehostet.input_file
: Speicherort der Datei, die die Daten enthält, mit denen unser Hadoop-Job die Daten verarbeiten soll. Wir laden die Daten gemeinsam in Schritt 5 an diesen Speicherort hoch.wordcount_args
: Argumente, die an die JAR-Datei übergeben werden.
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
Das entspricht einem datetime-Objekt, das Mitternacht des Vortags darstellt. Wenn dies beispielsweise am 4. März um 11:00 Uhr ausgeführt wird, würde das Datetime-Objekt am 3. März 00:00 darstellen. Das hängt mit der Planung in Airflow zusammen. Weitere Informationen
default_dag_args = {
'start_date': yesterday,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': models.Variable.get('gcp_project')
}
Die Variable default_dag_args
in Form eines Wörterbuchs sollte jedes Mal angegeben werden, wenn ein neuer DAG erstellt wird:
'email_on_failure'
: gibt an, ob E-Mail-Benachrichtigungen gesendet werden sollen, wenn eine Aufgabe fehlgeschlagen ist'email_on_retry'
: gibt an, ob E-Mail-Warnungen gesendet werden sollen, wenn eine Aufgabe wiederholt wird.'retries'
– Gibt an, wie oft Airflow im Falle eines DAG-Fehlers Wiederholungsversuche ausführen soll.'retry_delay'
: Gibt an, wie lange Airflow warten soll, bevor ein neuer Versuch unternommen wird'project_id'
: Informiert den DAG über die ID des GCP-Projekts, mit dem er verknüpft werden soll. Diese ID wird später mit dem Dataproc-Operator benötigt.
with models.DAG(
'composer_hadoop_tutorial',
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
Wenn Sie with models.DAG
verwenden, wird dem Script mitgeteilt, alles darunter in denselben DAG aufzunehmen. Außerdem werden drei Argumente übergeben:
- Der erste, ein String, ist der Name für den zu erstellenden DAG. In diesem Fall verwenden wir
composer_hadoop_tutorial
. schedule_interval
– Eindatetime.timedelta
-Objekt, für das wir hier einen Tag festgelegt haben. Das bedeutet, dass dieser DAG einmal täglich nach dem'start_date'
versucht, der zuvor in'default_dag_args'
festgelegt wurdedefault_args
– das zuvor erstellte Wörterbuch mit den Standardargumenten für den DAG
Dataproc-Cluster erstellen
Als Nächstes erstellen wir eine dataproc_operator.DataprocClusterCreateOperator
, mit der ein Cloud Dataproc-Cluster erstellt wird.
create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
task_id='create_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
num_workers=2,
zone=models.Variable.get('gce_zone'),
master_machine_type='n1-standard-1',
worker_machine_type='n1-standard-1')
Innerhalb dieses Operators gibt es einige Argumente, bis auf das erste sind alle für diesen Operator spezifisch:
task_id
: Genau wie beim BashOperator weisen wir dem Operator diesen Namen zu, der über die Airflow-Benutzeroberfläche angezeigt wird.cluster_name
: Der Name, der dem Cloud Dataproc-Cluster zugewiesen wird. Hier haben wir escomposer-hadoop-tutorial-cluster-{{ ds_nodash }}
genannt (optionale zusätzliche Informationen finden Sie im Infofeld).num_workers
– Anzahl der Worker, die dem Cloud Dataproc-Cluster zugewiesen werdenzone
: Die in der Airflow-Datenbank gespeicherte geografische Region, in der der Cluster gespeichert werden soll. Dadurch wird die Variable'gce_zone'
gelesen, die wir in Schritt 3 festgelegt haben.master_machine_type
– Der Maschinentyp, der dem Cloud Dataproc-Master zugewiesen werden soll.worker_machine_type
– Der Maschinentyp, der dem Cloud Dataproc-Worker zugewiesen werden soll.
Apache Hadoop-Job einreichen
Mit dataproc_operator.DataProcHadoopOperator
kann ein Job an einen Cloud Dataproc-Cluster gesendet werden.
run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
task_id='run_dataproc_hadoop',
main_jar=WORDCOUNT_JAR,
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
arguments=wordcount_args)
Wir bieten mehrere Parameter an:
task_id
– Name, den wir diesem Teil des DAG zuweisenmain_jar
– Speicherort der JAR-Datei, die im Cluster ausgeführt werden sollcluster_name
: Name des Clusters, in dem der Job ausgeführt werden soll. Wie Sie sehen, stimmt dieser mit dem des vorherigen Operators überein.arguments
: Argumente, die an die JAR-Datei übergeben werden, wie bei der Ausführung der JAR-Datei über die Befehlszeile
Cluster löschen
Der letzte Operator, den wir erstellen, ist der dataproc_operator.DataprocClusterDeleteOperator
.
delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
task_id='delete_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
trigger_rule=trigger_rule.TriggerRule.ALL_DONE)
Wie der Name schon sagt, löscht dieser Operator einen bestimmten Cloud Dataproc-Cluster. Hier sehen wir drei Argumente:
task_id
: Wie beim BashOperator ist dies der Name, den wir dem Operator zuweisen. Er ist in der Airflow-Benutzeroberfläche zu sehen.cluster_name
: Der Name, der dem Cloud Dataproc-Cluster zugewiesen wird. Hier haben wir ihncomposer-hadoop-tutorial-cluster-{{ ds_nodash }}
genannt. Optionale zusätzliche Informationen finden Sie im Infofeld nach „Dataproc-Cluster erstellen“.trigger_rule
: Wir haben Triggerregeln bereits kurz bei den Importen zu Beginn dieses Schritts erwähnt. Hier sehen Sie eine in Aktion. Standardmäßig wird ein Airflow-Operator nur ausgeführt, wenn alle vorgelagerten Operatoren erfolgreich ausgeführt wurden. Für die TriggerregelALL_DONE
ist nur erforderlich, dass alle vorgelagerten Operatoren abgeschlossen wurden, unabhängig davon, ob sie erfolgreich waren oder nicht. Auch wenn der Hadoop-Job fehlgeschlagen ist, wollen wir den Cluster also herunterreißen.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
Schließlich möchten wir, dass diese Operatoren in einer bestimmten Reihenfolge ausgeführt werden, und wir können dies mit Python-Bitverschiebeoperatoren angeben. In diesem Fall wird create_dataproc_cluster
immer zuerst ausgeführt, gefolgt von run_dataproc_hadoop
und schließlich delete_dataproc_cluster
.
Zusammengenommen sieht der Code so aus:
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# [START composer_hadoop_tutorial]
"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.
This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
See https://cloud.google.com/storage/docs/creating-buckets for creating a
bucket.
"""
import datetime
import os
from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule
# Output file for Cloud Dataproc job.
output_file = os.path.join(
models.Variable.get('gcs_bucket'), 'wordcount',
datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = (
'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
# Arguments to pass to Cloud Dataproc job.
input_file = 'gs://pub/shakespeare/rose.txt'
wordcount_args = ['wordcount', input_file, output_file]
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
default_dag_args = {
# Setting start date as yesterday starts the DAG immediately when it is
# detected in the Cloud Storage bucket.
'start_date': yesterday,
# To email on failure or retry set 'email' arg to your email and enable
# emailing here.
'email_on_failure': False,
'email_on_retry': False,
# If a task fails, retry it once after waiting at least 5 minutes
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': models.Variable.get('gcp_project')
}
# [START composer_hadoop_schedule]
with models.DAG(
'composer_hadoop_tutorial',
# Continue to run DAG once per day
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
# [END composer_hadoop_schedule]
# Create a Cloud Dataproc cluster.
create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
task_id='create_dataproc_cluster',
# Give the cluster a unique name by appending the date scheduled.
# See https://airflow.apache.org/code.html#default-variables
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
num_workers=2,
zone=models.Variable.get('gce_zone'),
master_machine_type='n1-standard-1',
worker_machine_type='n1-standard-1')
# Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
# master node.
run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
task_id='run_dataproc_hadoop',
main_jar=WORDCOUNT_JAR,
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
arguments=wordcount_args)
# Delete Cloud Dataproc cluster.
delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
task_id='delete_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
# Setting trigger_rule to ALL_DONE causes the cluster to be deleted
# even if the Dataproc job fails.
trigger_rule=trigger_rule.TriggerRule.ALL_DONE)
# [START composer_hadoop_steps]
# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
# [END composer_hadoop_steps]
# [END composer_hadoop]
5. Airflow-Dateien in Cloud Storage hochladen
DAG in den Ordner „/dags“ kopieren
- Öffnen Sie zuerst Cloud Shell, in dem das Cloud SDK bereits für Sie installiert ist.
- Python-Beispiel-Repository klonen und in das Verzeichnis „composer/workflows“ wechseln
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git && cd python-docs-samples/composer/workflows
- Führen Sie den folgenden Befehl aus, um den Namen Ihres DAGs-Ordners als Umgebungsvariable festzulegen.
DAGS_FOLDER=$(gcloud composer environments describe ${COMPOSER_INSTANCE} \ --location us-central1 --format="value(config.dagGcsPrefix)")
- Führen Sie den folgenden
gsutil
-Befehl aus, um den Anleitungscode in den Ordner „/dags“ zu kopieren
gsutil cp hadoop_tutorial.py $DAGS_FOLDER
Die Ausgabe sieht in etwa so aus:
Copying file://hadoop_tutorial.py [Content-Type=text/x-python]... / [1 files][ 4.1 KiB/ 4.1 KiB] Operation completed over 1 objects/4.1 KiB.
6. Airflow-UI verwenden
So greifen Sie mit der GCP Console auf die Airflow-Weboberfläche zu:
|
So greifen Sie auf die Airflow-Weboberfläche zu.
Variablen ansehen
Die zuvor erstellten Variablen sind dauerhaft in Ihrer Umgebung vorhanden. Sie können sie aufrufen, indem Sie in der Airflow-UI-Menüleiste Verwaltung > Variablen auswählen.
DAG-Ausführungen untersuchen
Wenn Sie die DAG-Datei in den Ordner dags
in Cloud Storage hochladen, parst Cloud Composer die Datei. Werden keine Fehler gefunden, ist der Name des Workflows in der DAG-Liste zu sehen und der Workflow wird zur sofortigen Ausführung in die Warteschlange gestellt. Zum Aufrufen Ihrer DAGs klicken Sie oben auf der Seite auf DAGs.
Klicken Sie auf composer_hadoop_tutorial
, um die DAG-Detailseite zu öffnen. Sie enthält eine grafische Darstellung der Workflowaufgaben und ‑abhängigkeiten.
Klicken Sie in der Symbolleiste auf Graph View und bewegen Sie den Mauszeiger auf die Grafik, um den Status der Aufgaben zu sehen. Der Rahmen um jede Aufgabe gibt auch deren Status an (grün = läuft; rot = fehlgeschlagen usw.).
So führen Sie den Workflow über die Grafikansicht noch einmal aus:
- Klicken Sie in der Grafikansicht der Airflow-Benutzeroberfläche auf die Grafik
create_dataproc_cluster
. - Klicken Sie auf Löschen, um die drei Aufgaben zurückzusetzen, und dann zur Bestätigung auf OK.
Sie können den Status und die Ergebnisse des composer-hadoop-tutorial
-Workflows auch auf den folgenden Seiten der GCP Console prüfen:
- Cloud Dataproc-Cluster zum Überwachen der Clustererstellung und -löschung. Beachten Sie, dass der vom Workflow erstellte Cluster sitzungsspezifisch ist: Er ist nur für die Dauer des Workflows vorhanden und wird im Rahmen der letzten Workflowaufgabe gelöscht.
- Cloud Dataproc-Jobs zum Anzeigen oder Überwachen des Apache Hadoop-Wordcount-Jobs. Klicken Sie auf die Job-ID, um die Ausgabe des Job-Logs aufzurufen.
- Cloud Storage-Browser, um die Ergebnisse der Wortzählung im Ordner
wordcount
des Cloud Storage-Buckets anzusehen, den Sie für dieses Codelab erstellt haben.
7. Bereinigen
So vermeiden Sie, dass Ihrem GCP-Konto die in diesem Codelab verwendeten Ressourcen in Rechnung gestellt werden:
- Optional: Wenn Sie Ihre Daten speichern möchten, laden Sie sie aus dem Cloud Storage-Bucket für die Cloud Composer-Umgebung und aus dem Speicher-Bucket herunter, den Sie für dieses Codelab erstellt haben.
- Löschen Sie den Cloud Storage-Bucket, den Sie für dieses Codelab erstellt haben.
- Löschen Sie den Cloud Storage-Bucket für die Umgebung.
- Löschen Sie die Cloud Composer-Umgebung. Der Storage-Bucket wird beim Löschen der Umgebung nicht gelöscht.
Optional können Sie das Projekt auch löschen:
- Rufen Sie in der GCP Console die Seite Projekte auf.
- Wählen Sie in der Projektliste das Projekt aus, das Sie löschen möchten, und klicken Sie auf Löschen.
- Geben Sie im Feld die Projekt-ID ein und klicken Sie auf Herunterfahren, um das Projekt zu löschen.