1. Introduzione
I flussi di lavoro sono un caso d'uso comune nell'analisi dei dati: implicano l'importazione, la trasformazione e l'analisi dei dati per trovare le informazioni significative al loro interno. Nella piattaforma Google Cloud, lo strumento per l'orchestrazione dei flussi di lavoro è Cloud Composer, una versione in hosting del popolare strumento per flussi di lavoro open source Apache Airflow. In questo lab utilizzerai Cloud Composer per creare un semplice flusso di lavoro che crea un cluster Cloud Dataproc, lo analizza utilizzando Cloud Dataproc e Apache Hadoop, quindi elimina il cluster Cloud Dataproc in seguito.
Che cos'è Cloud Composer?
Cloud Composer è un servizio di orchestrazione del flusso di lavoro completamente gestito che consente di creare, programmare e monitorare pipeline su cloud e in data center on-premise. Realizzato sulla base del noto progetto open source Apache Airflow e gestito tramite il linguaggio di programmazione Python, Cloud Composer è privo di vincoli e facile da utilizzare.
Utilizzando Cloud Composer al posto di un'istanza locale di Apache Airflow, gli utenti possono trarre vantaggio dal meglio di Airflow senza overhead per l'installazione o la gestione.
Che cos'è Apache Airflow?
Apache Airflow è uno strumento open source utilizzato per creare, pianificare e monitorare i flussi di lavoro in modo programmatico. Ci sono alcuni termini chiave da ricordare relativi ad Airflow che vedrai nel lab:
- DAG: un DAG (Directed Acyclic Graph) è una raccolta di attività organizzate che vuoi pianificare ed eseguire. I DAG, chiamati anche flussi di lavoro, sono definiti in file Python standard
- Operatore: un operatore descrive una singola attività in un flusso di lavoro
Che cos'è Cloud Dataproc?
Cloud Dataproc è il servizio Apache Spark e Apache Hadoop completamente gestito della piattaforma Google Cloud. Cloud Dataproc si integra facilmente con altri servizi Google Cloud, offrendo una piattaforma completa e potente per l'elaborazione dei dati, l'analisi e il machine learning.
Che cosa farai
Questo codelab ti mostra come creare ed eseguire un flusso di lavoro Apache Airflow in Cloud Composer che completa le attività seguenti:
- Crea un cluster Cloud Dataproc
- Esegue un job di conteggio parole di Apache Hadoop sul cluster e invia i risultati a Cloud Storage
- Elimina il cluster
Cosa imparerai a fare
- Come creare ed eseguire un flusso di lavoro Apache Airflow in Cloud Composer
- Come utilizzare Cloud Composer e Cloud Dataproc per eseguire un'analisi su un set di dati
- Come accedere all'ambiente Cloud Composer tramite la console di Google Cloud Platform, Cloud SDK e l'interfaccia web di Airflow
Che cosa ti serve
- Account Google Cloud
- Conoscenza di base dell'interfaccia a riga di comando
- Conoscenza di base di Python
2. Configurazione di Google Cloud
Crea il progetto
Seleziona o crea un progetto della piattaforma Google Cloud.
Prendi nota dell'ID progetto, che utilizzerai nei passaggi successivi.
Se stai creando un nuovo progetto, l'ID progetto si trova proprio sotto il nome del progetto nella pagina di creazione. | |
Se hai già creato un progetto, puoi trovare l'ID nella home page della console nella scheda Informazioni sul progetto |
Abilita le API
Abilita le API Cloud Composer, Cloud Dataproc e Cloud Storage. Una volta abilitate, puoi ignorare il pulsante "Vai a Credenziali" e passare al passaggio successivo del tutorial. |
Crea ambiente Composer
Crea un ambiente Cloud Composer con la seguente configurazione:
Tutte le altre configurazioni possono rimanere predefinite. Fai clic su "Crea" in basso. |
Crea un bucket Cloud Storage
Nel tuo progetto, crea un bucket Cloud Storage con la seguente configurazione:
Premi "Crea" quando è tutto pronto |
3. Configurazione di Apache Airflow
Visualizzazione delle informazioni sull'ambiente Composer
Nella console di Google Cloud, apri la pagina Ambienti.
Fai clic sul nome dell'ambiente per visualizzarne i dettagli.
La pagina Dettagli ambiente fornisce informazioni come l'URL dell'interfaccia web di Airflow, l'ID cluster Google Kubernetes Engine, il nome del bucket Cloud Storage e il percorso della cartella /dags.
In Airflow, un DAG (Directed Acyclic Graph) è una raccolta di attività organizzate da pianificare ed eseguire. I DAG, chiamati anche flussi di lavoro, sono definiti in file Python standard. Cloud Composer pianifica solo i DAG nella cartella /dags. La cartella /dags si trova nel bucket Cloud Storage che Cloud Composer crea automaticamente quando crei il tuo ambiente.
Impostazione delle variabili di ambiente Apache Airflow
Le variabili Apache Airflow sono un concetto specifico di questa piattaforma e sono diverse dalle variabili di ambiente. In questo passaggio, imposterai le seguenti tre variabili Airflow: gcp_project
, gcs_bucket
e gce_zone
.
Utilizzare gcloud
per impostare le variabili
Per prima cosa, apri Cloud Shell, in cui è installato Cloud SDK.
Imposta la variabile di ambiente COMPOSER_INSTANCE
sul nome dell'ambiente Composer
COMPOSER_INSTANCE=my-composer-environment
Per impostare le variabili Airflow utilizzando lo strumento a riga di comando gcloud, utilizza il comando gcloud composer environments run
con il sottocomando variables
. Questo comando gcloud composer
esegue il sottocomando dell'interfaccia a riga di comando di Airflow variables
. Il sottocomando passa gli argomenti allo strumento a riga di comando gcloud
.
Eseguirai questo comando tre volte, sostituendo le variabili con quelle pertinenti al tuo progetto.
Imposta gcp_project
utilizzando il seguente comando, sostituendo <your-project-id> con l'ID progetto che hai preso nota nel passaggio 2.
gcloud composer environments run ${COMPOSER_INSTANCE} \ --location us-central1 variables -- --set gcp_project <your-project-id>
L'output sarà simile a questo
kubeconfig entry generated for us-central1-my-composer-env-123abc-gke. Executing within the following Kubernetes cluster namespace: composer-1-10-0-airflow-1-10-2-123abc [2020-04-17 20:42:49,713] {settings.py:176} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=449 [2020-04-17 20:42:50,123] {default_celery.py:90} WARNING - You have configured a result_backend of redis://airflow-redis-service.default.svc.cluste r.local:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database). [2020-04-17 20:42:50,127] {__init__.py:51} INFO - Using executor CeleryExecutor [2020-04-17 20:42:50,433] {app.py:52} WARNING - Using default Composer Environment Variables. Overrides have not been applied. [2020-04-17 20:42:50,440] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg [2020-04-17 20:42:50,452] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg
Imposta gcs_bucket
utilizzando il seguente comando, sostituendo <your-bucket-name>
con l'ID bucket che hai annotato nel passaggio 2. Se hai seguito il nostro suggerimento, il nome del bucket corrisponde all'ID progetto. L'output sarà simile al comando precedente.
gcloud composer environments run ${COMPOSER_INSTANCE} \ --location us-central1 variables -- --set gcs_bucket gs://<your-bucket-name>
Imposta gce_zone
utilizzando il seguente comando. L'output sarà simile a quello dei comandi precedenti.
gcloud composer environments run ${COMPOSER_INSTANCE} \ --location us-central1 variables -- --set gce_zone us-central1-a
(Facoltativo) Utilizzo di gcloud
per visualizzare una variabile
Per vedere il valore di una variabile, esegui il sottocomando dell'interfaccia a riga di comando di Airflow variables
con l'argomento get
oppure utilizza l'interfaccia utente di Airflow.
Ad esempio:
gcloud composer environments run ${COMPOSER_INSTANCE} \ --location us-central1 variables -- --get gcs_bucket
Puoi farlo con una qualsiasi delle tre variabili appena impostate: gcp_project
, gcs_bucket
e gce_zone
.
4. Flusso di lavoro di esempio
Diamo un'occhiata al codice per il DAG che utilizzeremo nel passaggio 5. Non preoccuparti di scaricare ancora i file, ti basterà seguire questa pagina.
Ci sono molti aspetti da prendere in considerazione qui, quindi analizziamoli un po'.
from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule
Iniziamo con alcune importazioni di Airflow:
airflow.models
: ci consente di accedere ai dati del database Airflow e crearli.airflow.contrib.operators
: dove vivono gli operatori della community. In questo caso, abbiamo bisogno didataproc_operator
per accedere all'API Cloud Dataproc.airflow.utils.trigger_rule
- Per aggiungere regole di attivazione ai nostri operatori. Le regole trigger consentono un controllo granulare sull'eventuale esecuzione di un operatore in relazione allo stato dei relativi elementi padre.
output_file = os.path.join(
models.Variable.get('gcs_bucket'), 'wordcount',
datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
Questo specifica la posizione del file di output. La riga più importante è models.Variable.get('gcs_bucket')
, che acquisisce il valore della variabile gcs_bucket
dal database Airflow.
WORDCOUNT_JAR = (
'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
input_file = 'gs://pub/shakespeare/rose.txt'
wordcount_args = ['wordcount', input_file, output_file]
WORDCOUNT_JAR
: posizione del file .jar che verrà eseguito sul cluster Cloud Dataproc. È già ospitata su Google Cloud per te.input_file
: la posizione del file contenente i dati su cui verrà eseguito il calcolo del nostro job Hadoop. Caricheremo i dati nella località insieme nel passaggio 5.wordcount_args
: argomenti che passeremo nel file jar.
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
In questo modo otteniamo un oggetto data/ora equivalente che rappresenta la mezzanotte del giorno precedente. Ad esempio, se questo viene eseguito alle 11:00 del 4 marzo, l'oggetto datetime rappresenterà 00:00 il 3 marzo. Questo ha a che fare con il modo in cui Airflow gestisce la pianificazione. Puoi trovare ulteriori informazioni qui.
default_dag_args = {
'start_date': yesterday,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': models.Variable.get('gcp_project')
}
La variabile default_dag_args
sotto forma di dizionario deve essere fornita ogni volta che viene creato un nuovo DAG:
'email_on_failure'
: indica se devono essere inviati avvisi via email in caso di errore di un'attività'email_on_retry'
: indica se devono essere inviati avvisi via email quando viene eseguito un nuovo tentativo per un'attività'retries'
: indica il numero di nuovi tentativi che Airflow dovrebbe effettuare in caso di errore del DAG'retry_delay'
: indica per quanto tempo Airflow deve attendere prima di tentare un nuovo tentativo'project_id'
- Indica al DAG l'ID progetto Google Cloud a cui associarlo, che sarà necessario in seguito con l'operatore Dataproc
with models.DAG(
'composer_hadoop_tutorial',
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
L'utilizzo di with models.DAG
indica allo script di includere tutto ciò che si trova sotto all'interno dello stesso DAG. Vediamo anche tre argomenti passati:
- La prima, una stringa, è il nome per assegnare al DAG che stiamo creando. In questo caso, utilizziamo
composer_hadoop_tutorial
. schedule_interval
: un oggettodatetime.timedelta
, che qui abbiamo impostato su un giorno. Ciò significa che questo DAG tenterà di eseguire una volta al giorno dopo il giorno'start_date'
impostato in precedenza in'default_dag_args'
default_args
: il dizionario creato in precedenza contenente gli argomenti predefiniti per il DAG
Crea un cluster Dataproc
Successivamente, creeremo un dataproc_operator.DataprocClusterCreateOperator
che crea un cluster Cloud Dataproc.
create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
task_id='create_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
num_workers=2,
zone=models.Variable.get('gce_zone'),
master_machine_type='n1-standard-1',
worker_machine_type='n1-standard-1')
All'interno di questo operatore, vediamo alcuni argomenti, tutti tranne il primo:
task_id
: come in BashOperator, questo è il nome che assegniamo all'operatore, che è visibile dalla UI di Airflowcluster_name
: il nome assegnato al cluster Cloud Dataproc. Qui l'abbiamo chiamatocomposer-hadoop-tutorial-cluster-{{ ds_nodash }}
(vedi la casella informativa per informazioni aggiuntive facoltative)num_workers
: il numero di worker allocati al cluster Cloud Dataproczone
: la regione geografica in cui vogliamo risiedere il cluster, come salvata nel database Airflow. Verrà letta la variabile'gce_zone'
impostata nel passaggio 3master_machine_type
- Il tipo di macchina da allocare al master Cloud Dataprocworker_machine_type
: il tipo di macchina che vogliamo allocare al worker Cloud Dataproc
Inviare un job Apache Hadoop
dataproc_operator.DataProcHadoopOperator
ci consente di inviare un job a un cluster Cloud Dataproc.
run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
task_id='run_dataproc_hadoop',
main_jar=WORDCOUNT_JAR,
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
arguments=wordcount_args)
Forniamo diversi parametri:
task_id
- Nome che assegniamo a questa parte del DAGmain_jar
- Percorso del file .jar che vogliamo eseguire nel clustercluster_name
: nome del cluster su cui eseguire il job, che è identico a quello dell'operatore precedentearguments
. Argomenti che vengono passati nel file jar, come se eseguisse il file .jar dalla riga di comando.
Elimina il cluster
L'ultimo operatore che creeremo è dataproc_operator.DataprocClusterDeleteOperator
.
delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
task_id='delete_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
trigger_rule=trigger_rule.TriggerRule.ALL_DONE)
Come suggerisce il nome, questo operatore elimina un determinato cluster Cloud Dataproc. Qui vediamo tre argomenti:
task_id
: come in BashOperator, questo è il nome che assegniamo all'operatore, che è visibile dalla UI di Airflowcluster_name
- Il nome assegnato al cluster Cloud Dataproc. Qui lo abbiamo chiamatocomposer-hadoop-tutorial-cluster-{{ ds_nodash }}
(vedi la casella informativa dopo "Crea un cluster Dataproc" per ulteriori informazioni facoltative)trigger_rule
- Abbiamo parlato brevemente delle regole di attivazione durante le importazioni all'inizio di questo passaggio, ma eccone una in azione. Per impostazione predefinita, un operatore Airflow non viene eseguito a meno che tutti i suoi operatori a monte non siano stati completati correttamente. La regola di attivazioneALL_DONE
richiede solo che tutti gli operatori a monte siano stati completati, indipendentemente dal fatto che siano stati completati o meno. In questo caso, anche se il job Hadoop non è riuscito, vogliamo comunque eliminare il cluster.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
Infine, vogliamo che questi operatori vengano eseguiti in un determinato ordine e possiamo indicarlo utilizzando gli operatori bitshift Python. In questo caso, create_dataproc_cluster
verrà sempre eseguito per primo, seguito da run_dataproc_hadoop
e infine da delete_dataproc_cluster
.
Mettendo tutto insieme, il codice avrà il seguente aspetto:
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# [START composer_hadoop_tutorial]
"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.
This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
See https://cloud.google.com/storage/docs/creating-buckets for creating a
bucket.
"""
import datetime
import os
from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule
# Output file for Cloud Dataproc job.
output_file = os.path.join(
models.Variable.get('gcs_bucket'), 'wordcount',
datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = (
'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
# Arguments to pass to Cloud Dataproc job.
input_file = 'gs://pub/shakespeare/rose.txt'
wordcount_args = ['wordcount', input_file, output_file]
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
default_dag_args = {
# Setting start date as yesterday starts the DAG immediately when it is
# detected in the Cloud Storage bucket.
'start_date': yesterday,
# To email on failure or retry set 'email' arg to your email and enable
# emailing here.
'email_on_failure': False,
'email_on_retry': False,
# If a task fails, retry it once after waiting at least 5 minutes
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': models.Variable.get('gcp_project')
}
# [START composer_hadoop_schedule]
with models.DAG(
'composer_hadoop_tutorial',
# Continue to run DAG once per day
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
# [END composer_hadoop_schedule]
# Create a Cloud Dataproc cluster.
create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
task_id='create_dataproc_cluster',
# Give the cluster a unique name by appending the date scheduled.
# See https://airflow.apache.org/code.html#default-variables
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
num_workers=2,
zone=models.Variable.get('gce_zone'),
master_machine_type='n1-standard-1',
worker_machine_type='n1-standard-1')
# Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
# master node.
run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
task_id='run_dataproc_hadoop',
main_jar=WORDCOUNT_JAR,
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
arguments=wordcount_args)
# Delete Cloud Dataproc cluster.
delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
task_id='delete_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
# Setting trigger_rule to ALL_DONE causes the cluster to be deleted
# even if the Dataproc job fails.
trigger_rule=trigger_rule.TriggerRule.ALL_DONE)
# [START composer_hadoop_steps]
# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
# [END composer_hadoop_steps]
# [END composer_hadoop]
5. Carica i file Airflow in Cloud Storage
Copia il DAG nella cartella /dags
- Innanzitutto, apri Cloud Shell, in cui è installato Cloud SDK.
- Clona il repository di esempi di Python e passa alla directory composer/workflows
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git && cd python-docs-samples/composer/workflows
- Esegui questo comando per impostare il nome della cartella dei DAG su una variabile di ambiente
DAGS_FOLDER=$(gcloud composer environments describe ${COMPOSER_INSTANCE} \ --location us-central1 --format="value(config.dagGcsPrefix)")
- Esegui questo comando
gsutil
per copiare il codice del tutorial nel punto in cui viene creata la cartella /dags
gsutil cp hadoop_tutorial.py $DAGS_FOLDER
L'output sarà simile al seguente:
Copying file://hadoop_tutorial.py [Content-Type=text/x-python]... / [1 files][ 4.1 KiB/ 4.1 KiB] Operation completed over 1 objects/4.1 KiB.
6. usa l'UI di Airflow
Per accedere all'interfaccia web di Airflow utilizzando la console Google Cloud:
|
Per informazioni sulla UI di Airflow, consulta Accesso all'interfaccia web di Airflow.
Visualizza variabili
Le variabili impostate in precedenza vengono mantenute nel tuo ambiente. Puoi visualizzare le variabili selezionando Admin > Variabili dalla barra dei menu della UI di Airflow.
Esplorazione delle esecuzioni dei DAG
Quando carichi il file DAG nella cartella dags
di Cloud Storage, Cloud Composer analizza il file. Se non vengono rilevati errori, il nome del flusso di lavoro viene visualizzato nell'elenco di DAG e il flusso di lavoro viene messo in coda per essere eseguito immediatamente. Per visualizzare i DAG, fai clic su DAG nella parte superiore della pagina.
Fai clic su composer_hadoop_tutorial
per aprire la pagina dei dettagli del DAG. Questa pagina include una rappresentazione grafica delle attività e delle dipendenze del flusso di lavoro.
Ora, nella barra degli strumenti, fai clic su Visualizzazione grafico e poi passa il mouse sopra il grafico di ogni attività per visualizzarne lo stato. Puoi notare che il bordo attorno a ciascuna attività indica anche lo stato (bordo verde = in esecuzione; rosso = non riuscita e così via).
Per eseguire di nuovo il flusso di lavoro dalla visualizzazione Grafico:
- Nella visualizzazione del grafico dell'interfaccia utente di Airflow, fai clic sull'immagine
create_dataproc_cluster
. - Fai clic su Cancella per reimpostare le tre attività, quindi fai clic su OK per confermare.
Puoi anche controllare lo stato e i risultati del flusso di lavoro composer-hadoop-tutorial
visitando le seguenti pagine della console Google Cloud:
- Cluster di Cloud Dataproc per monitorare la creazione e l'eliminazione del cluster. Tieni presente che il cluster creato dal flusso di lavoro è temporaneo. Esiste solo per la durata del flusso di lavoro e viene eliminato nell'ambito dell'ultima attività del flusso.
- Job Cloud Dataproc per visualizzare o monitorare il job di conteggio parole di Apache Hadoop. Fai clic sull'ID job per visualizzarne l'output del log.
- Cloud Storage Browser per visualizzare i risultati del conteggio delle parole nella cartella
wordcount
del bucket Cloud Storage che hai creato per questo codelab.
7. Esegui la pulizia
Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questo codelab:
- (Facoltativo) Per salvare i dati, scaricali dal bucket Cloud Storage per l'ambiente Cloud Composer e dal bucket di archiviazione che hai creato per questo codelab.
- Elimina il bucket Cloud Storage che hai creato per questo codelab.
- Elimina il bucket Cloud Storage per l'ambiente.
- Elimina l'ambiente Cloud Composer. Tieni presente che l'eliminazione dell'ambiente non comporta l'eliminazione del bucket di archiviazione per l'ambiente.
Se vuoi, puoi anche eliminare il progetto:
- Nella console di Google Cloud, vai alla pagina Progetti.
- Nell'elenco dei progetti, seleziona il progetto che vuoi eliminare e fai clic su Elimina.
- Nella casella, digita l'ID progetto, quindi fai clic su Chiudi per eliminare il progetto.