1. Introdução
O Apache Airflow foi projetado para executar DAGs regularmente, mas também pode acioná-los em resposta a eventos, como uma alteração em um bucket do Cloud Storage ou uma mensagem enviada ao Cloud Pub/Sub. Para isso, é possível acionar os DAGs do Cloud Composer pelo Cloud Functions.
No exemplo deste laboratório, um DAG simples é executado sempre que ocorre uma alteração em um bucket do Cloud Storage. Esse DAG usa o BashOperator para executar um comando bash, imprimindo as informações de alteração sobre o que foi enviado para o bucket do Cloud Storage.
Antes de iniciar este laboratório, recomendamos concluir os codelabs Introdução ao Cloud Composer e Introdução ao Cloud Functions. Você vai poder usar um ambiente do Composer criado no codelab Introdução ao Cloud Composer neste laboratório.
O que você vai criar
Neste codelab, você aprenderá a:
- Faça o upload de um arquivo no Google Cloud Storage, que
- Acionar uma função do Google Cloud usando o ambiente de execução do Node.JS
- Esta função vai executar um DAG no Google Cloud Composer
- Isso executa um comando bash simples, imprimindo a alteração no bucket do Google Cloud Storage
O que você vai aprender
- Como acionar um DAG do Apache Airflow usando o Google Cloud Functions + Node.js
O que é necessário
- Conta do GCP
- Noções básicas de JavaScript
- Conhecimento básico do Cloud Composer/Airflow e Cloud Functions
- Conforto ao usar comandos da CLI
2. Como configurar o GCP
Selecione ou crie o projeto
Selecione ou crie um projeto do Google Cloud Platform. Se você estiver criando um novo projeto, siga estas etapas.
Anote o ID do projeto, porque ele será usado nas próximas etapas.
Se você estiver criando um novo projeto, o ID dele pode ser encontrado logo abaixo do nome do projeto na página de criação. | |
Se você já criou um projeto, é possível encontrar o ID na página inicial do console no card "Informações do projeto" |
Ative as APIs
Criar um ambiente do Composer
Crie um ambiente do Cloud Composer com a seguinte configuração:
Todas as outras configurações podem permanecer nos valores padrão. Clique em "Criar". na parte de baixo. Anote o nome e o local do ambiente do Composer. Você vai precisar deles nas próximas etapas. |
Crie o bucket do Cloud Storage
No seu projeto, crie um bucket do Cloud Storage com a seguinte configuração:
Pressione "Criar" Quando estiver tudo pronto, anote o nome do bucket do Cloud Storage para as etapas posteriores. |
3. Como configurar o Google Cloud Functions (GCF)
Para configurar o GCF, vamos executar comandos no Google Cloud Shell.
Embora o Google Cloud possa ser operado remotamente em um laptop usando a ferramenta de linha de comando gcloud, neste codelab vamos usar o Google Cloud Shell, um ambiente de linha de comando executado no Cloud.
O Cloud Shell é uma máquina virtual com base em Debian que contém todas as ferramentas de desenvolvimento necessárias. Ela oferece um diretório principal permanente de 5 GB, além de ser executada no Google Cloud, o que aprimora o desempenho e a autenticação da rede. Isso significa que tudo que você precisa para este codelab é um navegador (sim, funciona em um Chromebook).
Para ativar o Google Cloud Shell, no console do desenvolvedor, clique no botão no canto superior direito. O provisionamento e a conexão ao ambiente devem levar apenas alguns instantes. |
Conceder permissões de assinatura de blob à conta de serviço do Cloud Functions
Para que o GCF faça a autenticação no Cloud IAP, o proxy que protege o servidor da Web do Airflow, conceda o papel Service Account Token Creator
ao GCF da conta de serviço do Appspot. Para isso, execute o comando a seguir no Cloud Shell, substituindo <your-project-id>
pelo nome do projeto.
gcloud iam service-accounts add-iam-policy-binding \ <your-project-id>@appspot.gserviceaccount.com \ --member=serviceAccount:<your-project-id>@appspot.gserviceaccount.com \ --role=roles/iam.serviceAccountTokenCreator
Por exemplo, se o nome do projeto for my-project
, o comando será:
gcloud iam service-accounts add-iam-policy-binding \ my-project@appspot.gserviceaccount.com \ --member=serviceAccount:my-project@appspot.gserviceaccount.com \ --role=roles/iam.serviceAccountTokenCreator
Como conseguir o ID de cliente
Para criar um token e autenticar-se no Cloud IAP, a função requer o ID do cliente do proxy que protege o servidor da Web do Airflow. A API Cloud Composer não fornece essas informações diretamente. Em vez disso, faça uma solicitação não autenticada ao servidor da Web do Airflow e capture o ID do cliente no URL de redirecionamento. Vamos fazer isso executando um arquivo Python usando o Cloud Shell para capturar o ID do cliente.
Faça o download do código necessário no GitHub executando o seguinte comando no Cloud Shell
cd git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
Se você recebeu um erro porque esse diretório já existe, atualize-o para a versão mais recente executando o seguinte comando:
cd python-docs-samples/ git pull origin master
Mude para o diretório apropriado executando
cd python-docs-samples/composer/rest
Execute o código Python para receber seu ID do cliente, substituindo o nome do projeto por <your-project-id>
, o local do ambiente do Composer criado anteriormente por <your-composer-location>
e o nome do ambiente do Composer criado anteriormente para <your-composer-environment>
.
python3 get_client_id.py <your-project-id> <your-composer-location> <your-composer-environment>
Por exemplo, se o nome do projeto for my-project
, o local do Composer for us-central1
e o nome do ambiente for my-composer
, o comando será
python3 get_client_id.py my-project us-central1 my-composer
A get_client_id.py
realiza as seguintes ações:
- Autenticar com o Google Cloud
- Faz uma solicitação HTTP não autenticada ao servidor da Web do Airflow para receber o URI de redirecionamento
- Extrai o parâmetro de consulta
client_id
desse redirecionamento - Imprime para você usar
Seu ID do cliente será impresso na linha de comando e ficará mais ou menos assim:
12345678987654321-abc1def3ghi5jkl7mno8pqr0.apps.googleusercontent.com
4. Crie sua função
No Cloud Shell, clone o repositório com o exemplo de código necessário executando:
cd git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git
Mude para o diretório necessário e deixe o Cloud Shell aberto enquanto conclui as próximas etapas
cd nodejs-docs-samples/composer/functions/composer-storage-trigger
Navegue até a página do Google Cloud Functions clicando no menu de navegação e, em seguida, em "Cloud Functions". | |
Clique em "CREATE FUNCTION" no topo da página | |
Nomeie a função como "my-function" e deixar a memória com o padrão, 256 MB. | |
Defina o gatilho como "Cloud Storage", deixe o tipo de evento como "Finalizar/Criar" e navegue até o bucket que você criou na etapa "Criar um bucket do Cloud Storage". | |
Deixe o código-fonte definido como "Editor in-line" e definir o ambiente de execução como "Node.js 8" |
No Cloud Shell, execute o comando a seguir. Isso abrirá index.js e package.json no Editor do Cloud Shell.
cloudshell edit index.js package.json
Clique na guia package.json, copie o código e cole-o na seção package.json do editor in-line do Cloud Functions. | |
Defina a "função a ser executada" para triggerDag | |
Clique na guia index.js, copie e cole o código na seção index.js do editor in-line do Cloud Functions | |
Altere |
No Cloud Shell, execute o comando a seguir, substituindo <your-environment-name> pelo nome do seu ambiente do Composer e <your-composer-region> pela região em que o ambiente do Composer está localizado.
gcloud composer environments describe <your-environment-name> --location <your-composer-region>
Por exemplo, se o nome do ambiente for my-composer-environment
e estiver localizado em us-central1
, o comando será
gcloud composer environments describe my-composer-environment --location us-central1
A resposta será semelhante a esta:
config:
airflowUri: https://abc123efghi456k-tp.appspot.com
dagGcsPrefix: gs://narnia-north1-test-codelab-jklmno-bucket/dags
gkeCluster: projects/a-project/zones/narnia-north1-b/clusters/narnia-north1-test-codelab-jklmno-gke
nodeConfig:
diskSizeGb: 100
location: projects/a-project/zones/narnia-north1-b
machineType: projects/a-project/zones/narnia-north1-b/machineTypes/n1-standard-1
network: projects/a-project/global/networks/default
oauthScopes:
- https://www.googleapis.com/auth/cloud-platform
serviceAccount: 987665432-compute@developer.gserviceaccount.com
nodeCount: 3
softwareConfig:
imageVersion: composer-1.7.0-airflow-1.10.0
pythonVersion: '2'
createTime: '2019-05-29T09:41:27.919Z'
name: projects/a-project/locations/narnia-north1/environments/my-composer-environment
state: RUNNING
updateTime: '2019-05-29T09:56:29.969Z'
uuid: 123456-7890-9876-543-210123456
Nessa saída, procure a variável com o nome | |
Clique no botão "Mais" link suspenso e escolha a região geograficamente mais próxima de você | |
Marque "Tentar novamente em caso de falha". | |
Clique em "Criar". para criar uma função do Cloud |
Percorrer o código
O código que você copiou do index.js vai ficar mais ou menos assim:
// [START composer_trigger]
'use strict';
const fetch = require('node-fetch');
const FormData = require('form-data');
/**
* Triggered from a message on a Cloud Storage bucket.
*
* IAP authorization based on:
* https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
* and
* https://cloud.google.com/iap/docs/authentication-howto
*
* @param {!Object} data The Cloud Functions event data.
* @returns {Promise}
*/
exports.triggerDag = async data => {
// Fill in your Composer environment information here.
// The project that holds your function
const PROJECT_ID = 'your-project-id';
// Navigate to your webserver's login page and get this from the URL
const CLIENT_ID = 'your-iap-client-id';
// This should be part of your webserver's URL:
// {tenant-project-id}.appspot.com
const WEBSERVER_ID = 'your-tenant-project-id';
// The name of the DAG you wish to trigger
const DAG_NAME = 'composer_sample_trigger_response_dag';
// Other constants
const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
const USER_AGENT = 'gcf-event-trigger';
const BODY = {conf: JSON.stringify(data)};
// Make the request
try {
const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);
return makeIapPostRequest(
WEBSERVER_URL,
BODY,
iap.idToken,
USER_AGENT,
iap.jwt
);
} catch (err) {
throw new Error(err);
}
};
/**
* @param {string} clientId The client id associated with the Composer webserver application.
* @param {string} projectId The id for the project containing the Cloud Function.
* @param {string} userAgent The user agent string which will be provided with the webserver request.
*/
const authorizeIap = async (clientId, projectId, userAgent) => {
const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
const JWT_HEADER = Buffer.from(
JSON.stringify({alg: 'RS256', typ: 'JWT'})
).toString('base64');
let jwt = '';
let jwtClaimset = '';
// Obtain an Oauth2 access token for the appspot service account
const res = await fetch(
`http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
{
headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
}
);
const tokenResponse = await res.json();
if (tokenResponse.error) {
return Promise.reject(tokenResponse.error);
}
const accessToken = tokenResponse.access_token;
const iat = Math.floor(new Date().getTime() / 1000);
const claims = {
iss: SERVICE_ACCOUNT,
aud: 'https://www.googleapis.com/oauth2/v4/token',
iat: iat,
exp: iat + 60,
target_audience: clientId,
};
jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
const toSign = [JWT_HEADER, jwtClaimset].join('.');
const blob = await fetch(
`https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
{
method: 'POST',
body: JSON.stringify({
bytesToSign: Buffer.from(toSign).toString('base64'),
}),
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${accessToken}`,
},
}
);
const blobJson = await blob.json();
if (blobJson.error) {
return Promise.reject(blobJson.error);
}
// Request service account signature on header and claimset
const jwtSignature = blobJson.signature;
jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
const form = new FormData();
form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
form.append('assertion', jwt);
const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
method: 'POST',
body: form,
});
const tokenJson = await token.json();
if (tokenJson.error) {
return Promise.reject(tokenJson.error);
}
return {
jwt: jwt,
idToken: tokenJson.id_token,
};
};
/**
* @param {string} url The url that the post request targets.
* @param {string} body The body of the post request.
* @param {string} idToken Bearer token used to authorize the iap request.
* @param {string} userAgent The user agent to identify the requester.
*/
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
const res = await fetch(url, {
method: 'POST',
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${idToken}`,
},
body: JSON.stringify(body),
});
if (!res.ok) {
const err = await res.text();
throw new Error(err);
}
};
// [END composer_trigger]
Vamos analisar o que está acontecendo. Há três funções aqui: triggerDag
, authorizeIap
e makeIapPostRequest
triggerDag
é a função acionada quando fazemos upload de algo para o bucket designado do Cloud Storage. É nele que configuramos variáveis importantes usadas em outras solicitações, como PROJECT_ID
, CLIENT_ID
, WEBSERVER_ID
e DAG_NAME
. Ele chama authorizeIap
e makeIapPostRequest
.
exports.triggerDag = async data => {
// Fill in your Composer environment information here.
// The project that holds your function
const PROJECT_ID = 'your-project-id';
// Navigate to your webserver's login page and get this from the URL
const CLIENT_ID = 'your-iap-client-id';
// This should be part of your webserver's URL:
// {tenant-project-id}.appspot.com
const WEBSERVER_ID = 'your-tenant-project-id';
// The name of the DAG you wish to trigger
const DAG_NAME = 'composer_sample_trigger_response_dag';
// Other constants
const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
const USER_AGENT = 'gcf-event-trigger';
const BODY = {conf: JSON.stringify(data)};
// Make the request
try {
const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);
return makeIapPostRequest(
WEBSERVER_URL,
BODY,
iap.idToken,
USER_AGENT,
iap.jwt
);
} catch (err) {
throw new Error(err);
}
};
authorizeIap
faz uma solicitação ao proxy que protege o servidor da Web do Airflow, usando uma conta de serviço e "troca" um JWT para um token de ID que será usado para autenticar o makeIapPostRequest
.
const authorizeIap = async (clientId, projectId, userAgent) => {
const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
const JWT_HEADER = Buffer.from(
JSON.stringify({alg: 'RS256', typ: 'JWT'})
).toString('base64');
let jwt = '';
let jwtClaimset = '';
// Obtain an Oauth2 access token for the appspot service account
const res = await fetch(
`http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
{
headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
}
);
const tokenResponse = await res.json();
if (tokenResponse.error) {
return Promise.reject(tokenResponse.error);
}
const accessToken = tokenResponse.access_token;
const iat = Math.floor(new Date().getTime() / 1000);
const claims = {
iss: SERVICE_ACCOUNT,
aud: 'https://www.googleapis.com/oauth2/v4/token',
iat: iat,
exp: iat + 60,
target_audience: clientId,
};
jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
const toSign = [JWT_HEADER, jwtClaimset].join('.');
const blob = await fetch(
`https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
{
method: 'POST',
body: JSON.stringify({
bytesToSign: Buffer.from(toSign).toString('base64'),
}),
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${accessToken}`,
},
}
);
const blobJson = await blob.json();
if (blobJson.error) {
return Promise.reject(blobJson.error);
}
// Request service account signature on header and claimset
const jwtSignature = blobJson.signature;
jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
const form = new FormData();
form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
form.append('assertion', jwt);
const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
method: 'POST',
body: form,
});
const tokenJson = await token.json();
if (tokenJson.error) {
return Promise.reject(tokenJson.error);
}
return {
jwt: jwt,
idToken: tokenJson.id_token,
};
};
makeIapPostRequest
faz uma chamada ao servidor da Web do Airflow para acionar o composer_sample_trigger_response_dag.
. O nome do DAG é incorporado no URL do servidor da Web do Airflow transmitido com o parâmetro url
, e o idToken
é o token que recebemos na solicitação authorizeIap
.
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
const res = await fetch(url, {
method: 'POST',
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${idToken}`,
},
body: JSON.stringify(body),
});
if (!res.ok) {
const err = await res.text();
throw new Error(err);
}
};
5. Configurar seu DAG
No Cloud Shell, mude para o diretório com os fluxos de trabalho de amostra. Ele faz parte do python-docs-samples que você baixou do GitHub na etapa Receber o ID do cliente.
cd cd python-docs-samples/composer/workflows
Faça upload do DAG no Composer
Faça upload do DAG de amostra no bucket de armazenamento do DAG do ambiente do Composer com o seguinte comando, em que <environment_name>
é o nome do ambiente do Composer e <location>
é o nome da região em que ele está localizado. trigger_response_dag.py
é o DAG com que vamos trabalhar.
gcloud composer environments storage dags import \ --environment <environment_name> \ --location <location> \ --source trigger_response_dag.py
Por exemplo, se o nome do ambiente do Composer for my-composer
e estiver localizado em us-central1
, o comando será:
gcloud composer environments storage dags import \ --environment my-composer \ --location us-central1 \ --source trigger_response_dag.py
Percorrer o DAG
O código DAG em trigger_response.py
é parecido com este:
import datetime
import airflow
from airflow.operators import bash_operator
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': datetime.datetime(2017, 1, 1),
}
with airflow.DAG(
'composer_sample_trigger_response_dag',
default_args=default_args,
# Not scheduled, trigger only
schedule_interval=None) as dag:
# Print the dag_run's configuration, which includes information about the
# Cloud Storage object change.
print_gcs_info = bash_operator.BashOperator(
task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')
A seção default_args
contém os argumentos padrão, conforme exigido pelo modelo BaseOperator no Apache Airflow. Você veria essa seção com esses parâmetros em qualquer DAG do Apache Airflow. O owner
está definido como Composer Example
, mas você pode alterá-lo para ser seu nome. depends_on_past
mostra que este DAG não depende de nenhum DAG anterior. As três seções de e-mail, email
, email_on_failure
e email_on_retry
, são definidas para que nenhuma notificação por e-mail chegue com base no status desse DAG. O DAG vai tentar novamente apenas uma vez, já que retries
está definido como 1 e vai fazer isso após cinco minutos, por retry_delay
. O start_date
normalmente determina quando um DAG precisa ser executado com o schedule_interval
(definido posteriormente), mas, no caso deste DAG, não é relevante. Ela está definida como 1o de janeiro de 2017, mas pode ser definida para qualquer data no passado.
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': datetime.datetime(2017, 1, 1),
}
A seção with airflow.DAG
configura o DAG que será executado. Ela será executada com o ID de tarefa composer_sample_trigger_response_dag
, os argumentos padrão da seção default_args
e, o mais importante, com um schedule_interval
de None
. O schedule_interval
está definido como None
porque estamos acionando esse DAG específico com nossa função do Cloud. É por isso que o start_date
em default_args
não é relevante.
Quando executado, o DAG imprime a própria configuração, conforme ditado na variável print_gcs_info
.
with airflow.DAG(
'composer_sample_trigger_response_dag',
default_args=default_args,
# Not scheduled, trigger only
schedule_interval=None) as dag:
# Print the dag_run's configuration, which includes information about the
# Cloud Storage object change.
print_gcs_info = bash_operator.BashOperator(
task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')
6. Testar a função
Abra seu ambiente do Composer e, na linha com o nome do ambiente, clique no link do Airflow | |
Clique no nome da | |
Abra outra guia e faça upload de qualquer arquivo para o bucket do Cloud Storage criado anteriormente e especificado como o gatilho da função do Cloud. Para isso, acesse o Console ou use um comando gsutil. | |
Volte para a guia com a interface do Airflow e clique em "Graph View" | |
Clique na tarefa | |
Clique em "Exibir log". no canto superior direito do menu | |
Nos registros, são exibidas informações sobre o arquivo que você enviou para o bucket do Cloud Storage. |
Parabéns! Você acabou de acionar um DAG do Airflow usando Node.js e o Google Cloud Functions.
7. Limpeza
Para evitar cobranças na sua conta do GCP pelo uso de recursos neste guia de início rápido, siga estas etapas:
- (Opcional) Para salvar seus dados, faça o download deles do bucket do Cloud Storage referente ao ambiente do Cloud Composer e do bucket de armazenamento que você criou para este guia de início rápido.
- Exclua o bucket do Cloud Storage que você criou no ambiente
- Exclua o ambiente do Cloud Composer. A exclusão do ambiente não remove o bucket de armazenamento dele.
- (Opcional) Na computação sem servidor, os primeiros 2 milhões de invocações por mês são sem custo financeiro, e quando você escalona a função para zero, não há cobrança. Consulte os preços para mais detalhes. No entanto, se você quiser excluir sua função do Cloud, faça isso clicando em "EXCLUIR" no canto superior direito da página de visão geral da sua função
Também é possível excluir o projeto:
- No Console do GCP, acesse a página Projetos.
- Na lista de projetos, selecione o projeto que você quer excluir e clique em Excluir.
- Na caixa, digite o ID do projeto e clique em Encerrar para excluí-lo.