1. Introducción
Los flujos de trabajo son un caso de uso común en el análisis de datos. Implican transferir, transformar y analizar datos para encontrar la información significativa que contienen. En Google Cloud Platform, la herramienta para organizar flujos de trabajo es Cloud Composer, que es una versión alojada de Apache Airflow, la popular herramienta de código abierto para flujos de trabajo. En este lab, usarás Cloud Composer para crear un flujo de trabajo simple que genere un clúster de Cloud Dataproc, lo analice con Cloud Dataproc y Apache Hadoop y, luego, borre el clúster de Cloud Dataproc.
¿Qué es Cloud Composer?
Cloud Composer es un servicio de organización del flujo de trabajo completamente administrado que te da la posibilidad de crear, programar y supervisar canalizaciones que se distribuyen en servicios en la nube y en centros de datos locales. Cloud Composer se basa en el popular proyecto de código abierto Apache Airflow y se opera mediante el lenguaje de programación Python, por lo que es fácil de usar y no implica compromisos a largo plazo.
Con Cloud Composer en lugar de una instancia local de Apache Airflow, los usuarios pueden beneficiarse de lo mejor de Airflow sin sobrecarga de instalación ni administración.
¿Qué es Apache Airflow?
Apache Airflow es una herramienta de código abierto que se usa para crear, programar y supervisar flujos de trabajo de manera programática. Hay algunos términos clave relacionados con Airflow que verás a lo largo del lab:
- DAG: Un DAG (grafo acíclico dirigido) es una colección de tareas organizadas que deseas programar y ejecutar. Los DAG, también llamados flujos de trabajo, se definen en archivos estándares de Python.
- Operador: Un operador describe una sola tarea en un flujo de trabajo.
¿Qué es Cloud Dataproc?
Cloud Dataproc es el servicio de Apache Spark y Apache Hadoop completamente administrado de Google Cloud Platform. Cloud Dataproc se integra fácilmente en otros servicios de GCP, lo que te proporciona una plataforma completa y potente para el procesamiento de datos, las estadísticas y el aprendizaje automático.
Actividades
En este codelab, se muestra cómo crear y ejecutar un flujo de trabajo de Apache Airflow en Cloud Composer que complete las siguientes tareas:
- Crea un clúster de Cloud Dataproc
- Ejecuta un trabajo de conteo de palabras de Apache Hadoop en el clúster y envía los resultados a Cloud Storage.
- Eliminación del clúster
Qué aprenderás
- Cómo crear y ejecutar un flujo de trabajo de Apache Airflow en Cloud Composer
- Cómo usar Cloud Composer y Cloud Dataproc para ejecutar análisis en un conjunto de datos
- Cómo acceder a tu entorno de Cloud Composer a través de Google Cloud Platform Console, el SDK de Cloud y la interfaz web de Airflow
Requisitos
- Cuenta de GCP
- Conocimientos básicos de la CLI
- Conocimientos básicos de Python
2. Cómo configurar GCP
Crea el proyecto
Selecciona o crea un proyecto de Google Cloud Platform.
Toma nota del ID de tu proyecto, que usarás en pasos posteriores.
Si vas a crear un proyecto nuevo, el ID del proyecto se encuentra justo debajo del nombre del proyecto en la página de creación. | |
Si ya creaste un proyecto, puedes encontrar el ID en la página principal de la consola, en la tarjeta Información del proyecto. |
Habilitar las API
Habilita las APIs de Cloud Composer, Cloud Dataproc y Cloud Storage. Una vez que estén habilitadas, puedes ignorar el botón que dice "Ir a credenciales" y continuar con el siguiente paso del instructivo. |
Crea un entorno de Composer
Crea un entorno de Cloud Composer con la siguiente configuración:
El resto de los parámetros de configuración pueden permanecer con los valores predeterminados. Haz clic en "Crear" en la parte inferior. |
Crea un bucket de Cloud Storage
En tu proyecto, crea un bucket de Cloud Storage con la siguiente configuración:
Presiona "Crear" cuando hayas terminado |
3. Configura Apache Airflow
Visualiza información del entorno de Composer
En GCP Console, abre la página Entornos.
Haz clic en el nombre del entorno para ver sus detalles.
En la página Detalles del entorno, se proporciona información como la URL de la interfaz web de Airflow, el ID de clúster de Google Kubernetes Engine, el nombre del bucket de Cloud Storage y la ruta de acceso a la carpeta /dags.
En Airflow, un DAG (grafo acíclico dirigido) es una colección de tareas organizadas que deseas programar y ejecutar. Los DAG, también llamados flujos de trabajo, se definen en archivos estándares de Python. Cloud Composer solo programa los DAG en la carpeta /dags. La carpeta /dags se encuentra en el bucket de Cloud Storage que Cloud Composer crea automáticamente cuando creas tu entorno.
Configura las variables de entorno de Apache Airflow
Las variables de Apache Airflow son un concepto específico de Airflow que difiere de las variables de entorno. En este paso, establecerás las siguientes tres variables de Airflow: gcp_project
, gcs_bucket
y gce_zone
.
Usa gcloud
para establecer variables
Primero, abre Cloud Shell, que tiene el SDK de Cloud instalado de forma conveniente.
Establece la variable de entorno COMPOSER_INSTANCE
según el nombre de tu entorno de Composer.
COMPOSER_INSTANCE=my-composer-environment
Para configurar variables de Airflow con la herramienta de línea de comandos de gcloud, usa el comando gcloud composer environments run
con el subcomando variables
. Este comando gcloud composer
ejecuta el subcomando variables
de la CLI de Airflow. El subcomando pasa los argumentos a la herramienta de línea de comandos de gcloud
.
Ejecutarás este comando tres veces y reemplazarás las variables por las relevantes para tu proyecto.
Configura gcp_project
con el siguiente comando y reemplaza <your-project-id> por el ID del proyecto que tomaste nota en el paso 2.
gcloud composer environments run ${COMPOSER_INSTANCE} \ --location us-central1 variables -- --set gcp_project <your-project-id>
El resultado se verá similar al siguiente:
kubeconfig entry generated for us-central1-my-composer-env-123abc-gke. Executing within the following Kubernetes cluster namespace: composer-1-10-0-airflow-1-10-2-123abc [2020-04-17 20:42:49,713] {settings.py:176} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=449 [2020-04-17 20:42:50,123] {default_celery.py:90} WARNING - You have configured a result_backend of redis://airflow-redis-service.default.svc.cluste r.local:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database). [2020-04-17 20:42:50,127] {__init__.py:51} INFO - Using executor CeleryExecutor [2020-04-17 20:42:50,433] {app.py:52} WARNING - Using default Composer Environment Variables. Overrides have not been applied. [2020-04-17 20:42:50,440] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg [2020-04-17 20:42:50,452] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg
Configura gcs_bucket
con el siguiente comando y reemplaza <your-bucket-name>
por el ID del bucket que anotaste en el paso 2. Si seguiste nuestra recomendación, el nombre del bucket es el mismo que el ID de tu proyecto. El resultado será similar al del comando anterior.
gcloud composer environments run ${COMPOSER_INSTANCE} \ --location us-central1 variables -- --set gcs_bucket gs://<your-bucket-name>
Configura gce_zone
con el siguiente comando. El resultado será similar a los comandos anteriores.
gcloud composer environments run ${COMPOSER_INSTANCE} \ --location us-central1 variables -- --set gce_zone us-central1-a
(Opcional) Usa gcloud
para ver una variable
Para ver el valor de una variable, ejecuta el subcomando variables
de la CLI de Airflow con el argumento get
o usa la IU de Airflow.
Por ejemplo:
gcloud composer environments run ${COMPOSER_INSTANCE} \ --location us-central1 variables -- --get gcs_bucket
Puedes hacerlo con cualquiera de las tres variables que acabas de configurar: gcp_project
, gcs_bucket
y gce_zone
.
4. Flujo de trabajo de muestra
Veamos el código del DAG que usaremos en el paso 5. Todavía no te preocupes por descargar archivos, solo sigue aquí.
Hay mucho que desentrañar aquí, así que analicémoslo un poco.
from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule
Comenzamos con algunas importaciones de Airflow:
airflow.models
: Nos permite acceder a datos y crearlos en la base de datos de Airflow.airflow.contrib.operators
: Es el lugar donde viven los operadores de la comunidad. En este caso, necesitamosdataproc_operator
para acceder a la API de Cloud Dataproc.airflow.utils.trigger_rule
: Para agregar reglas de activación a nuestros operadores. Las reglas de activación permiten un control detallado sobre si un operador debe ejecutarse en relación con el estado de sus elementos superiores.
output_file = os.path.join(
models.Variable.get('gcs_bucket'), 'wordcount',
datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
Esto especifica la ubicación de nuestro archivo de salida. La línea importante aquí es models.Variable.get('gcs_bucket')
, que tomará el valor de la variable gcs_bucket
de la base de datos de Airflow.
WORDCOUNT_JAR = (
'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
input_file = 'gs://pub/shakespeare/rose.txt'
wordcount_args = ['wordcount', input_file, output_file]
WORDCOUNT_JAR
: Es la ubicación del archivo .jar que ejecutaremos en el clúster de Cloud Dataproc. Ya está alojada en GCP para ti.input_file
: Es la ubicación del archivo que contiene los datos que nuestro trabajo de Hadoop procesará en algún momento. Subiremos los datos a esa ubicación juntos en el paso 5.wordcount_args
: Son los argumentos que pasaremos al archivo jar.
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
Esto nos proporcionará un objeto de fecha y hora equivalente que representa la medianoche del día anterior. Por ejemplo, si se ejecuta a las 11:00 a.m. del 4 de marzo, el objeto fecha y hora representaría las 00:00 a.m. del 3 de marzo. Esto tiene que ver con la forma en que Airflow maneja la programación. Obtén más información aquí.
default_dag_args = {
'start_date': yesterday,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': models.Variable.get('gcp_project')
}
Se debe proporcionar la variable default_dag_args
en forma de diccionario cada vez que se crea un DAG nuevo:
'email_on_failure'
: Indica si se deben enviar alertas por correo electrónico cuando falló una tarea.'email_on_retry'
: Indica si se deben enviar alertas por correo electrónico cuando se vuelve a intentar una tarea.'retries'
: Indica la cantidad de intentos de reinyección que debe realizar Airflow en caso de que falle un DAG.'retry_delay'
: Indica cuánto tiempo debe esperar Airflow antes de intentar un nuevo intento.'project_id'
: Indica al DAG con qué ID del proyecto de GCP debe asociarlo, lo que se necesitará más adelante con el operador de Dataproc.
with models.DAG(
'composer_hadoop_tutorial',
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
El uso de with models.DAG
le indica a la secuencia de comandos que incluya todo lo que se encuentra debajo de ella dentro del mismo DAG. También vemos que se pasan tres argumentos:
- El primero, una cadena, es el nombre que le daremos al DAG que estamos creando. En este caso, usamos
composer_hadoop_tutorial
. schedule_interval
: Es un objetodatetime.timedelta
, que aquí se configura como un día. Esto significa que este DAG intentará ejecutarse una vez al día después del'start_date'
que se configuró antes en'default_dag_args'
.default_args
: Es el diccionario que creamos antes que contiene los argumentos predeterminados del DAG.
Cree un clúster de Dataproc
A continuación, crearemos un dataproc_operator.DataprocClusterCreateOperator
que generará un clúster de Cloud Dataproc.
create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
task_id='create_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
num_workers=2,
zone=models.Variable.get('gce_zone'),
master_machine_type='n1-standard-1',
worker_machine_type='n1-standard-1')
Dentro de este operador, vemos algunos argumentos, todos excepto el primero son específicos de este operador:
task_id
: Al igual que en BashOperator, este es el nombre que le asignamos al operador, que se puede ver desde la IU de Airflow.cluster_name
: Es el nombre que asignamos al clúster de Cloud Dataproc. Aquí, lo llamamoscomposer-hadoop-tutorial-cluster-{{ ds_nodash }}
(consulta el cuadro de información para obtener información adicional opcional).num_workers
: Es la cantidad de trabajadores que asignamos al clúster de Cloud Dataproc.zone
: Es la región geográfica en la que queremos que se encuentre el clúster, como se guardó en la base de datos de Airflow. Esto leerá la variable'gce_zone'
que configuramos en el paso 3.master_machine_type
: el tipo de máquina que queremos asignar a la instancia principal de Cloud Dataprocworker_machine_type
: Es el tipo de máquina que queremos asignar al trabajador de Cloud Dataproc.
Envía un trabajo de Apache Hadoop
dataproc_operator.DataProcHadoopOperator
nos permite enviar un trabajo a un clúster de Cloud Dataproc.
run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
task_id='run_dataproc_hadoop',
main_jar=WORDCOUNT_JAR,
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
arguments=wordcount_args)
Proporcionamos varios parámetros:
task_id
: Es el nombre que asignamos a esta parte del DAG.main_jar
: Ubicación del archivo .jar que queremos ejecutar en el clústercluster_name
: Es el nombre del clúster en el que se ejecutará el trabajo, que notarás que es idéntico al que encontramos en el operador anterior.arguments
: Son argumentos que se pasan al archivo JAR, como lo harías si ejecutaras el archivo .jar desde la línea de comandos.
Borre el clúster
El último operador que crearemos es el dataproc_operator.DataprocClusterDeleteOperator
.
delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
task_id='delete_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
trigger_rule=trigger_rule.TriggerRule.ALL_DONE)
Como su nombre lo indica, este operador borrará un clúster de Cloud Dataproc determinado. Aquí vemos tres argumentos:
task_id
: Al igual que en BashOperator, este es el nombre que asignamos al operador, que se puede ver desde la IU de Airflow.cluster_name
: Es el nombre que le asignamos al clúster de Cloud Dataproc. Aquí, lo llamamoscomposer-hadoop-tutorial-cluster-{{ ds_nodash }}
(consulta el cuadro de información después de "Crea un clúster de Dataproc" para obtener información adicional opcional).trigger_rule
. Mencionamos brevemente las reglas del activador durante las importaciones al comienzo de este paso, pero ahora tenemos una en acción. De forma predeterminada, un operador de Airflow no se ejecuta, a menos que todos sus operadores ascendentes lo hayan completado correctamente. La regla del activadorALL_DONE
solo requiere que se hayan completado todos los operadores upstream, independientemente de si se completaron correctamente o no. Aquí, esto significa que, incluso si el trabajo de Hadoop falla, aún queremos destruir el clúster.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
Por último, queremos que estos operadores se ejecuten en un orden particular, y podemos indicar esto con los operadores de desplazamiento de bits de Python. En este caso, siempre se ejecutará create_dataproc_cluster
primero, seguido de run_dataproc_hadoop
y, por último, delete_dataproc_cluster
.
Cuando se junta todo, el código se ve de la siguiente manera:
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# [START composer_hadoop_tutorial]
"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.
This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
See https://cloud.google.com/storage/docs/creating-buckets for creating a
bucket.
"""
import datetime
import os
from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule
# Output file for Cloud Dataproc job.
output_file = os.path.join(
models.Variable.get('gcs_bucket'), 'wordcount',
datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = (
'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
# Arguments to pass to Cloud Dataproc job.
input_file = 'gs://pub/shakespeare/rose.txt'
wordcount_args = ['wordcount', input_file, output_file]
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
default_dag_args = {
# Setting start date as yesterday starts the DAG immediately when it is
# detected in the Cloud Storage bucket.
'start_date': yesterday,
# To email on failure or retry set 'email' arg to your email and enable
# emailing here.
'email_on_failure': False,
'email_on_retry': False,
# If a task fails, retry it once after waiting at least 5 minutes
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': models.Variable.get('gcp_project')
}
# [START composer_hadoop_schedule]
with models.DAG(
'composer_hadoop_tutorial',
# Continue to run DAG once per day
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
# [END composer_hadoop_schedule]
# Create a Cloud Dataproc cluster.
create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
task_id='create_dataproc_cluster',
# Give the cluster a unique name by appending the date scheduled.
# See https://airflow.apache.org/code.html#default-variables
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
num_workers=2,
zone=models.Variable.get('gce_zone'),
master_machine_type='n1-standard-1',
worker_machine_type='n1-standard-1')
# Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
# master node.
run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
task_id='run_dataproc_hadoop',
main_jar=WORDCOUNT_JAR,
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
arguments=wordcount_args)
# Delete Cloud Dataproc cluster.
delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
task_id='delete_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
# Setting trigger_rule to ALL_DONE causes the cluster to be deleted
# even if the Dataproc job fails.
trigger_rule=trigger_rule.TriggerRule.ALL_DONE)
# [START composer_hadoop_steps]
# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
# [END composer_hadoop_steps]
# [END composer_hadoop]
5. Sube archivos de Airflow a Cloud Storage
Copia el DAG en tu carpeta /dags
- Primero, abre Cloud Shell, que tiene el SDK de Cloud instalado de forma conveniente.
- Clona el repositorio de muestras de Python y cambia al directorio composer/workflows.
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git && cd python-docs-samples/composer/workflows
- Ejecuta el siguiente comando para establecer el nombre de tu carpeta de DAG en una variable de entorno
DAGS_FOLDER=$(gcloud composer environments describe ${COMPOSER_INSTANCE} \ --location us-central1 --format="value(config.dagGcsPrefix)")
- Ejecuta el siguiente comando
gsutil
para copiar el código del instructivo en el lugar donde se crea tu carpeta /dags
gsutil cp hadoop_tutorial.py $DAGS_FOLDER
Deberías obtener un resultado similar al siguiente:
Copying file://hadoop_tutorial.py [Content-Type=text/x-python]... / [1 files][ 4.1 KiB/ 4.1 KiB] Operation completed over 1 objects/4.1 KiB.
6. Cómo usar la IU de Airflow
Sigue estos pasos para acceder a la interfaz web de Airflow desde la consola de GCP:
|
Para obtener información sobre la IU de Airflow, consulta Cómo acceder a la interfaz web.
Ver variables
Las variables que configuraste anteriormente se conservan en tu entorno. Para verlas, selecciona Administrador > Variables en la barra de menú de la IU de Airflow.
Explora las ejecuciones de DAG
Cuando subas tu archivo DAG a la carpeta dags
en Cloud Storage, Cloud Composer analizará el archivo. Si no se encuentran errores, el nombre del flujo de trabajo aparecerá en la lista del DAG y el flujo de trabajo se pondrá en cola para ejecutarse de inmediato. Para verlos, haz clic en DAG en la parte superior de la página.
Haz clic en composer_hadoop_tutorial
para abrir la página de detalles del DAG. En esta página, se incluye una representación gráfica de las dependencias y tareas del flujo de trabajo.
En la barra de herramientas, haz clic en Vista de gráfico y desplaza el mouse sobre el gráfico de cada tarea para ver su estado. Ten en cuenta que el borde de cada tarea también indica el estado (borde verde: running; rojo: failed; etcétera).
Para volver a ejecutar el flujo de trabajo desde la Vista del gráfico (Graph View), sigue estos pasos:
- En la Vista del gráfico (Graph View) de la IU de Airflow, haz clic en el gráfico
create_dataproc_cluster
. - Haz clic en Borrar para restablecer las tres tareas y, luego, en Aceptar para confirmar.
También puedes verificar el estado y los resultados del flujo de trabajo de composer-hadoop-tutorial
en las siguientes páginas de GCP Console:
- Clústeres de Cloud Dataproc para supervisar la creación y eliminación de clústeres. Ten en cuenta que el clúster que creó el flujo de trabajo es efímero: solo existe mientras dure el flujo de trabajo y se borra como parte de la última tarea del flujo de trabajo.
- Trabajos de Cloud Dataproc para ver o supervisar el trabajo de recuento de palabras de Apache Hadoop Haz clic en el ID del trabajo para ver el resultado del registro de trabajos.
- Usa el navegador de Cloud Storage para ver los resultados del recuento de palabras en la carpeta
wordcount
del bucket de Cloud Storage que creaste para este codelab.
7. Limpieza
Sigue estos pasos para evitar que se apliquen cargos a tu cuenta de GCP por los recursos que usaste en este codelab:
- Si quieres guardar tus datos, descarga los datos del bucket de Cloud Storage para el entorno de Cloud Composer y del bucket de almacenamiento que creaste para este codelab (opcional).
- Borra el bucket de Cloud Storage que creaste para este codelab.
- Borra el bucket de Cloud Storage del entorno.
- Borra el entorno de Cloud Composer. Ten en cuenta que, si borras el entorno, no se borrará el bucket de almacenamiento correspondiente.
De manera opcional, también puedes borrar el proyecto:
- En GCP Console, ve a la página Proyectos.
- En la lista de proyectos, selecciona el que quieres borrar y haz clic en Borrar.
- En el cuadro, escribe el ID del proyecto y, luego, haz clic en Cerrar para borrarlo.