1. Introdução
Os fluxos de trabalho são um caso de uso comum na análise de dados. Eles envolvem a ingestão, a transformação e a análise de dados para encontrar informações relevantes. A ferramenta do Google Cloud Platform para orquestrar fluxos de trabalho é o Cloud Composer, que é uma versão hospedada da famosa ferramenta de fluxo de trabalho de código aberto Apache Airflow. Neste laboratório, você vai usar o Cloud Composer para criar um fluxo de trabalho simples que cria um cluster do Cloud Dataproc, analisa esse cluster com o Cloud Dataproc e o Apache Hadoop e depois exclui esse cluster.
O que é o Cloud Composer?
O Google Cloud Composer é um serviço totalmente gerenciado de orquestração de fluxos de trabalho que permite criar, agendar e monitorar canais que abrangem nuvens e data centers no local. Criado no conhecido projeto de código aberto Apache Airflow (em inglês) e operado com a linguagem de programação Python, o Cloud Composer é fácil de usar e deixa você livre da dependência tecnológica.
Ao usar o Cloud Composer em vez de uma instância local do Apache Airflow, os usuários podem aproveitar o melhor do Airflow sem sobrecarga de instalação ou de gerenciamento.
O que é o Apache Airflow?
O Apache Airflow é uma ferramenta de código aberto usada para criar, programar e monitorar fluxos de trabalho programaticamente. Você vai ver alguns termos importantes relacionados ao Airflow que serão usados ao longo do laboratório:
- O DAG (gráfico acíclico dirigido) é uma coleção de tarefas organizadas que você quer programar e executar. Os DAGs, também chamados de fluxos de trabalho, são definidos em arquivos Python padrão
- Operador: descreve uma única tarefa em um fluxo de trabalho.
O que é o Cloud Dataproc?
O Cloud Dataproc é o serviço Apache Spark e Apache Hadoop totalmente gerenciado do Google Cloud Platform. O Cloud Dataproc se integra facilmente a outros serviços do GCP, oferecendo uma plataforma completa e avançada para processamento de dados, análise e aprendizado de máquina.
Atividades do laboratório
Neste codelab, mostramos como criar e executar um fluxo de trabalho do Apache Airflow no Cloud Composer que realiza as seguintes tarefas:
- Cria um cluster do Cloud Dataproc
- Executa um job de contagem de palavras do Apache Hadoop no cluster e envia os resultados para o Cloud Storage
- Exclui o cluster
O que você vai aprender
- Como criar e executar um fluxo de trabalho do Apache Airflow no Cloud Composer
- Como usar o Cloud Composer e o Cloud Dataproc para executar uma análise em um conjunto de dados
- Como acessar o ambiente do Cloud Composer pelo Console do Google Cloud Platform, pelo SDK do Cloud e pela interface da Web do Airflow
O que é necessário
- Conta do GCP
- Conhecimentos básicos sobre CLI
- compreensão básica do Python
2. Como configurar o GCP
Criar o projeto
Selecione ou crie um projeto do Google Cloud Platform.
Anote o ID do projeto, que será usado nas próximas etapas.
Se você estiver criando um novo projeto, o ID dele vai estar logo abaixo do nome na página de criação. | |
Se você já criou um projeto, o ID está disponível na página inicial do console, no card "Informações do projeto". |
Ativar as APIs
Ative as APIs Cloud Composer, Cloud Dataproc e Cloud Storage. Depois de ativadas, ignore o botão "Acessar credenciais" e prossiga para a próxima etapa do tutorial. |
Criar ambiente do Composer
Crie um ambiente do Cloud Composer com a seguinte configuração:
Todas as outras configurações podem permanecer nos valores padrão. Clique em "Criar" na parte de baixo. |
Criar um bucket do Cloud Storage
No seu projeto, crie um bucket do Cloud Storage com a seguinte configuração:
Pressione "Criar" quando estiver tudo pronto |
3. Como configurar o Apache Airflow
Como conferir informações do ambiente do Composer
No Console do GCP, abra a página Ambientes
Clique no nome do ambiente para conferir os detalhes.
A página Detalhes do ambiente mostra informações como o URL da interface da Web do Airflow, o ID do cluster do Google Kubernetes Engine, o nome do bucket do Cloud Storage e o caminho da pasta /dags.
No Airflow, um gráfico acíclico dirigido DAG é uma coleção de tarefas organizadas que você quer programar e executar. Os DAGs, também chamados de fluxos de trabalho, são definidos em arquivos Python padrão. O Cloud Composer só programa os DAGs na pasta /dags. A pasta /dags está no bucket do Cloud Storage que o Cloud Composer cria automaticamente quando você cria o ambiente.
Como definir variáveis de ambiente do Apache Airflow
As variáveis do Apache Airflow são um conceito específico da plataforma e diferente das variáveis de ambiente. Nesta etapa, você vai definir as três variáveis do Airflow: gcp_project
, gcs_bucket
e gce_zone
.
Como usar gcloud
para definir variáveis
Primeiro, abra o Cloud Shell, que tem o SDK do Cloud instalado para você.
Defina a variável de ambiente COMPOSER_INSTANCE
como o nome do ambiente do Composer
COMPOSER_INSTANCE=my-composer-environment
Para definir variáveis do Airflow usando a ferramenta de linha de comando gcloud, use o comando gcloud composer environments run
com o subcomando variables
. Esse comando gcloud composer
executa o subcomando variables
da CLI do Airflow. O subcomando transmite os argumentos para a ferramenta de linha de comando gcloud
.
Execute esse comando três vezes, substituindo as variáveis por aquelas relevantes para seu projeto.
Defina o gcp_project
usando o comando a seguir, substituindo <your-project-id> pelo ID do projeto que você anotou na etapa 2.
gcloud composer environments run ${COMPOSER_INSTANCE} \ --location us-central1 variables -- --set gcp_project <your-project-id>
A saída vai ser mais ou menos assim
kubeconfig entry generated for us-central1-my-composer-env-123abc-gke. Executing within the following Kubernetes cluster namespace: composer-1-10-0-airflow-1-10-2-123abc [2020-04-17 20:42:49,713] {settings.py:176} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=449 [2020-04-17 20:42:50,123] {default_celery.py:90} WARNING - You have configured a result_backend of redis://airflow-redis-service.default.svc.cluste r.local:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database). [2020-04-17 20:42:50,127] {__init__.py:51} INFO - Using executor CeleryExecutor [2020-04-17 20:42:50,433] {app.py:52} WARNING - Using default Composer Environment Variables. Overrides have not been applied. [2020-04-17 20:42:50,440] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg [2020-04-17 20:42:50,452] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg
Defina gcs_bucket
usando o comando a seguir, substituindo <your-bucket-name>
pelo ID do bucket que você anotou na Etapa 2. Se você seguiu nossa recomendação, o nome do bucket é o mesmo do ID do projeto. A saída será semelhante ao comando anterior.
gcloud composer environments run ${COMPOSER_INSTANCE} \ --location us-central1 variables -- --set gcs_bucket gs://<your-bucket-name>
Defina gce_zone
usando o comando a seguir. A resposta será semelhante aos comandos anteriores.
gcloud composer environments run ${COMPOSER_INSTANCE} \ --location us-central1 variables -- --set gce_zone us-central1-a
(Opcional) Como usar gcloud
para conferir uma variável
Para ver o valor de uma variável, execute o subcomando variables
da CLI do Airflow com o argumento get
ou use a interface do Airflow.
Exemplo:
gcloud composer environments run ${COMPOSER_INSTANCE} \ --location us-central1 variables -- --get gcs_bucket
Você pode fazer isso com qualquer uma das três variáveis que acabou de definir: gcp_project
, gcs_bucket
e gce_zone
.
4. Exemplo de fluxo de trabalho
Vejamos o código do DAG que usaremos na etapa 5. Não se preocupe em fazer o download de arquivos ainda. Siga as instruções abaixo.
Há muita coisa para explicar aqui, então vamos analisar um pouco.
from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule
Começamos com algumas importações do Airflow:
airflow.models
: permite acessar e criar dados no banco de dados do Airflow.airflow.contrib.operators
: onde os operadores da comunidade moram. Nesse caso, precisamos dodataproc_operator
para acessar a API Cloud Dataproc.airflow.utils.trigger_rule
: para adicionar regras de gatilho aos nossos operadores. As regras de acionamento permitem controlar com precisão se um operador precisa ser executado em relação ao status dos pais.
output_file = os.path.join(
models.Variable.get('gcs_bucket'), 'wordcount',
datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
Isso especifica a localização do nosso arquivo de saída. A linha models.Variable.get('gcs_bucket')
, que extrai o valor da variável gcs_bucket
do banco de dados do Airflow,
WORDCOUNT_JAR = (
'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
input_file = 'gs://pub/shakespeare/rose.txt'
wordcount_args = ['wordcount', input_file, output_file]
WORDCOUNT_JAR
: local do arquivo .jar que será executado no cluster do Cloud Dataproc. Ele já está hospedado no GCP para você.input_file
: local do arquivo que contém os dados que o job do Hadoop vai calcular. Vamos fazer upload dos dados para esse local na etapa 5.wordcount_args
: argumentos que serão transmitidos no arquivo jar.
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
Isso vai nos dar um objeto de data e hora equivalente que representa a meia-noite do dia anterior. Por exemplo, se isso for executado às 11h do dia 4 de março, o objeto de data e hora vai representar 00h do dia 3 de março. Isso tem a ver com a forma como o Airflow lida com a programação. Veja mais informações neste link.
default_dag_args = {
'start_date': yesterday,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': models.Variable.get('gcp_project')
}
A variável default_dag_args
na forma de um dicionário precisa ser fornecida sempre que um novo DAG for criado:
'email_on_failure'
: indica se os alertas por e-mail precisam ser enviados quando uma tarefa falha.'email_on_retry'
: indica se alertas por e-mail precisam ser enviados quando uma nova tentativa de tarefa é repetida'retries'
: indica quantas tentativas de nova tentativa o Airflow precisa fazer no caso de uma falha do DAG.'retry_delay'
: indica quanto tempo o Airflow deve aguardar antes de tentar uma nova tentativa.'project_id'
: informa ao DAG qual ID do projeto do GCP associar, o que será necessário mais tarde com o Dataproc Operator.
with models.DAG(
'composer_hadoop_tutorial',
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
O uso de with models.DAG
instrui o script a incluir tudo que está abaixo dele no mesmo DAG. Também podemos ver três argumentos transmitidos:
- O primeiro, uma string, é o nome a ser dado ao DAG que estamos criando. Neste caso, estamos usando
composer_hadoop_tutorial
. schedule_interval
: um objetodatetime.timedelta
, que aqui foi definido como um dia. Isso significa que esse DAG vai tentar ser executado uma vez por dia após a'start_date'
definida anteriormente em'default_dag_args'
.default_args
: o dicionário que criamos anteriormente contendo os argumentos padrão do DAG
crie um cluster do Dataproc
Em seguida, criaremos um dataproc_operator.DataprocClusterCreateOperator
que cria um cluster do Cloud Dataproc.
create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
task_id='create_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
num_workers=2,
zone=models.Variable.get('gce_zone'),
master_machine_type='n1-standard-1',
worker_machine_type='n1-standard-1')
Nesse operador, há alguns argumentos, todos específicos para esse operador, exceto o primeiro:
task_id
: assim como no BashOperator, esse é o nome que atribuímos ao operador e que pode ser visualizado na interface do Airflow.cluster_name
: o nome que atribuímos ao cluster do Cloud Dataproc. Aqui, o nomeamoscomposer-hadoop-tutorial-cluster-{{ ds_nodash }}
(consulte a caixa de informações para informações adicionais opcionais).num_workers
: o número de workers alocados no cluster do Cloud Dataproczone
: a região geográfica em que queremos que o cluster seja armazenado, conforme salvo no banco de dados do Airflow. Isso vai ler a variável'gce_zone'
definida na etapa 3.master_machine_type
: o tipo de máquina que queremos alocar para o mestre do Cloud Dataprocworker_machine_type
: o tipo de máquina que queremos alocar para o worker do Cloud Dataproc
Enviar um job do Apache Hadoop
O dataproc_operator.DataProcHadoopOperator
permite enviar um job para um cluster do Cloud Dataproc.
run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
task_id='run_dataproc_hadoop',
main_jar=WORDCOUNT_JAR,
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
arguments=wordcount_args)
Oferecemos vários parâmetros:
task_id
: nome que atribuímos a essa parte do DAGmain_jar
: local do arquivo .jar que queremos executar no cluster.cluster_name
: nome do cluster em que o job será executado, que é idêntico ao que encontramos no operador anteriorarguments
: argumentos que são transmitidos para o arquivo JAR, como se você estivesse executando o arquivo .jar na linha de comando.
Exclua o cluster
O último operador que vamos criar é o dataproc_operator.DataprocClusterDeleteOperator
.
delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
task_id='delete_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
trigger_rule=trigger_rule.TriggerRule.ALL_DONE)
Como o nome sugere, esse operador exclui um determinado cluster do Cloud Dataproc. Há três argumentos aqui:
task_id
: assim como no BashOperator, esse é o nome que atribuímos ao operador e que pode ser visualizado na interface do Airflow.cluster_name
: o nome que atribuímos ao cluster do Cloud Dataproc. Aqui, o nomeamoscomposer-hadoop-tutorial-cluster-{{ ds_nodash }}
. Confira mais informações opcionais na caixa de informações depois de "Criar um cluster do Dataproc".trigger_rule
: mencionamos brevemente as regras de acionamento durante as importações no início desta etapa, mas aqui temos uma em ação. Por padrão, um operador do Airflow não é executado a menos que todos os operadores upstream tenham sido concluídos. A regra de acionadorALL_DONE
exige apenas que todos os operadores upstream sejam concluídos, independentemente de terem sido bem-sucedidos ou não. Isso significa que, mesmo que o job do Hadoop falhe, ainda queremos remover o cluster.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
Por fim, queremos que esses operadores sejam executados em uma ordem específica, e podemos denotar isso usando operadores bitshift do Python. Nesse caso, create_dataproc_cluster
sempre será executado primeiro, seguido por run_dataproc_hadoop
e, por fim, delete_dataproc_cluster
.
Juntando tudo, o código vai ficar assim:
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# [START composer_hadoop_tutorial]
"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.
This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
See https://cloud.google.com/storage/docs/creating-buckets for creating a
bucket.
"""
import datetime
import os
from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule
# Output file for Cloud Dataproc job.
output_file = os.path.join(
models.Variable.get('gcs_bucket'), 'wordcount',
datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = (
'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
# Arguments to pass to Cloud Dataproc job.
input_file = 'gs://pub/shakespeare/rose.txt'
wordcount_args = ['wordcount', input_file, output_file]
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
default_dag_args = {
# Setting start date as yesterday starts the DAG immediately when it is
# detected in the Cloud Storage bucket.
'start_date': yesterday,
# To email on failure or retry set 'email' arg to your email and enable
# emailing here.
'email_on_failure': False,
'email_on_retry': False,
# If a task fails, retry it once after waiting at least 5 minutes
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'project_id': models.Variable.get('gcp_project')
}
# [START composer_hadoop_schedule]
with models.DAG(
'composer_hadoop_tutorial',
# Continue to run DAG once per day
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
# [END composer_hadoop_schedule]
# Create a Cloud Dataproc cluster.
create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
task_id='create_dataproc_cluster',
# Give the cluster a unique name by appending the date scheduled.
# See https://airflow.apache.org/code.html#default-variables
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
num_workers=2,
zone=models.Variable.get('gce_zone'),
master_machine_type='n1-standard-1',
worker_machine_type='n1-standard-1')
# Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
# master node.
run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
task_id='run_dataproc_hadoop',
main_jar=WORDCOUNT_JAR,
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
arguments=wordcount_args)
# Delete Cloud Dataproc cluster.
delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
task_id='delete_dataproc_cluster',
cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
# Setting trigger_rule to ALL_DONE causes the cluster to be deleted
# even if the Dataproc job fails.
trigger_rule=trigger_rule.TriggerRule.ALL_DONE)
# [START composer_hadoop_steps]
# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
# [END composer_hadoop_steps]
# [END composer_hadoop]
5. fazer upload de arquivos do Airflow para o Cloud Storage
Copiar o DAG para a pasta /dags
- Primeiro, abra o Cloud Shell, que tem o SDK Cloud instalado de forma prática.
- Clonar o repositório de exemplos em Python e mudar para o diretório composer/workflows
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git && cd python-docs-samples/composer/workflows
- Execute o seguinte comando para definir o nome da sua pasta de DAGs como uma variável de ambiente
DAGS_FOLDER=$(gcloud composer environments describe ${COMPOSER_INSTANCE} \ --location us-central1 --format="value(config.dagGcsPrefix)")
- Execute o comando
gsutil
a seguir para copiar o código do tutorial para o local onde sua pasta /dags é criada
gsutil cp hadoop_tutorial.py $DAGS_FOLDER
A resposta será semelhante a esta:
Copying file://hadoop_tutorial.py [Content-Type=text/x-python]... / [1 files][ 4.1 KiB/ 4.1 KiB] Operation completed over 1 objects/4.1 KiB.
6. Como usar a IU do Airflow
Para acessar a interface da Web do Airflow pelo Console do GCP, siga estas etapas:
|
Para saber mais sobre a IU do Airflow, consulte Como acessar a interface da Web.
Conferir variáveis
As variáveis já definidas são mantidas no seu ambiente. Para conferir as variáveis, selecione Admin > Variables na barra de menus da interface do Airflow.
Como o DAG é executado
Quando você faz upload do arquivo DAG para a pasta dags
no Cloud Storage, o Cloud Composer analisa o arquivo. Se nenhum erro for encontrado, o nome do fluxo de trabalho será exibido na lista de DAGs e entrará na fila para ser executado imediatamente. Clique em DAGs na parte de cima da página para conferir os DAGs.
Clique em composer_hadoop_tutorial
para abrir a página de detalhes do DAG. Nela, você encontra uma representação gráfica das tarefas e das dependências do fluxo de trabalho.
Na barra de ferramentas, clique em Graph View e passe o cursor sobre o gráfico de cada tarefa para conferir o status dela. A borda das tarefas também indica o status: verde = em execução, vermelha = falha etc.
Para executar o fluxo de trabalho novamente a partir da Visualização de gráfico:
- Na visualização de gráfico da IU do Airflow, clique no gráfico
create_dataproc_cluster
. - Clique em Clear para redefinir as três tarefas e em OK para confirmar.
Também é possível verificar o status e os resultados do fluxo de trabalho composer-hadoop-tutorial
acessando as seguintes páginas do Console do GCP:
- Clusters do Cloud Dataproc para monitorar a criação e exclusão de clusters. Observe que o cluster criado pelo fluxo de trabalho é efêmero: ele existe apenas durante o fluxo de trabalho e é excluído como parte da última tarefa do fluxo.
- Jobs do Cloud Dataproc para visualizar ou monitorar o job de contagem de palavras do Apache Hadoop. Clique no ID do job para ver a saída do registro dele.
- Navegador do Cloud Storage para conferir os resultados da contagem de palavras na pasta
wordcount
do bucket do Cloud Storage que você criou para este codelab.
7. Limpeza
Para evitar cobranças dos recursos usados neste codelab na conta do GCP, siga estas etapas:
- (Opcional) Para salvar seus dados, faça o download dos dados do bucket do Cloud Storage para o ambiente do Cloud Composer e o bucket de armazenamento que você criou para este codelab.
- Exclua o bucket do Cloud Storage que você criou para este codelab.
- Exclua o bucket do Cloud Storage do ambiente.
- Exclua o ambiente do Cloud Composer. A exclusão do ambiente não remove o bucket de armazenamento dele.
Também é possível excluir o projeto:
- No Console do GCP, acesse a página Projetos.
- Na lista de projetos, selecione o que você quer excluir e clique em Excluir.
- Na caixa, digite o ID do projeto e clique em Encerrar para excluir o projeto.