1. Introduzione
Apache Airflow è progettato per eseguire i DAG su una pianificazione regolare, ma puoi anche attivare i DAG in risposta a eventi, come una modifica in un bucket Cloud Storage o un push di un messaggio a Cloud Pub/Sub. A questo scopo, i DAG di Cloud Composer possono essere attivati da Cloud Functions.
L'esempio in questo lab esegue un semplice DAG ogni volta che si verifica una modifica in un bucket Cloud Storage. Questo DAG utilizza BashOperator per eseguire un comando bash che stampa le informazioni sulle modifiche relative a ciò che è stato caricato nel bucket Cloud Storage.
Prima di iniziare questo lab, è consigliabile completare i codelab Introduzione a Cloud Composer e Getting Started with Cloud Functions. Se crei un ambiente Composer nel codelab introduzione a Cloud Composer, puoi utilizzare questo ambiente in questo lab.
Cosa creerai
In questo codelab, imparerai a:
- Carica un file su Google Cloud Storage, che
- Attiva una funzione Google Cloud Functions utilizzando il runtime Node.JS
- Questa funzione eseguirà un DAG in Google Cloud Composer
- che esegue un semplice comando bash che stampa la modifica al bucket Google Cloud Storage
Obiettivi didattici
- Come attivare un DAG Apache Airflow utilizzando Google Cloud Functions + Node.js
Che cosa ti serve
- Account Google Cloud
- Conoscenza di base di JavaScript
- Conoscenza di base di Cloud Composer/Airflow e Cloud Functions
- Comfort con i comandi dell'interfaccia a riga di comando
2. Configurazione di Google Cloud
Seleziona o crea il progetto
Seleziona o crea un progetto della piattaforma Google Cloud. Se stai creando un nuovo progetto, segui i passaggi indicati qui.
Prendi nota dell'ID progetto, che utilizzerai nei passaggi successivi.
Se stai creando un nuovo progetto, l'ID progetto si trova subito sotto il nome del progetto nella pagina di creazione | |
Se hai già creato un progetto, puoi trovare l'ID nella home page della console nella scheda Informazioni sul progetto |
Abilita le API
Crea ambiente Composer
Crea un ambiente Cloud Composer con la seguente configurazione:
Tutte le altre configurazioni possono rimanere su quelle predefinite. Fai clic su "Crea" in basso.Prendi nota del nome e della posizione del tuo ambiente Composer: ti serviranno nei passaggi futuri. |
Crea un bucket Cloud Storage
Nel tuo progetto, crea un bucket Cloud Storage con la seguente configurazione:
Premi "Crea" quando è tutto prontoAssicurati di prendere nota del nome del tuo bucket Cloud Storage per i passaggi successivi. |
3. Configurazione di Google Cloud Functions (GCF)
Per configurare GCF, eseguiremo i comandi in Google Cloud Shell.
Google Cloud può essere gestito da remoto dal tuo laptop utilizzando lo strumento a riga di comando gcloud, in questo codelab utilizzeremo Google Cloud Shell, un ambiente a riga di comando in esecuzione nel cloud.
Questa macchina virtuale basata su Debian viene caricata con tutti gli strumenti di sviluppo necessari. Offre una home directory permanente da 5 GB e viene eseguita su Google Cloud, migliorando notevolmente le prestazioni di rete e l'autenticazione. Ciò significa che per questo codelab è sufficiente un browser (sì, funziona su Chromebook).
Per attivare Google Cloud Shell, dalla console per gli sviluppatori fai clic sul pulsante in alto a destra (il provisioning e la connessione all'ambiente dovrebbero richiedere solo qualche istante): |
Concedi le autorizzazioni di firma del blob all'account di servizio Cloud Functions
Per consentire a GCF di eseguire l'autenticazione su Cloud IAP, il proxy che protegge il server web Airflow, devi concedere al GCF dell'account di servizio Appspot il ruolo Service Account Token Creator
. Per farlo, esegui questo comando in Cloud Shell, sostituendo il nome del progetto con <your-project-id>
.
gcloud iam service-accounts add-iam-policy-binding \ <your-project-id>@appspot.gserviceaccount.com \ --member=serviceAccount:<your-project-id>@appspot.gserviceaccount.com \ --role=roles/iam.serviceAccountTokenCreator
Ad esempio, se il tuo progetto si chiama my-project
, il comando sarebbe
gcloud iam service-accounts add-iam-policy-binding \ my-project@appspot.gserviceaccount.com \ --member=serviceAccount:my-project@appspot.gserviceaccount.com \ --role=roles/iam.serviceAccountTokenCreator
Ottenere l'ID client
Per creare un token per eseguire l'autenticazione in Cloud IAP, la funzione richiede l'ID client del proxy che protegge il server web Airflow. L'API Cloud Composer non fornisce direttamente queste informazioni. Effettua invece una richiesta non autenticata al server web Airflow e acquisisci l'ID client dall'URL di reindirizzamento. Per farlo, eseguiamo un file Python utilizzando Cloud Shell per acquisire l'ID client.
Scarica il codice necessario da GitHub eseguendo il comando seguente in Cloud Shell
cd git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
Se hai ricevuto un errore perché la directory esiste già, aggiornala alla versione più recente eseguendo questo comando
cd python-docs-samples/ git pull origin master
Passa alla directory appropriata eseguendo
cd python-docs-samples/composer/rest
Esegui il codice Python per ottenere il tuo ID client, sostituendo il nome del progetto per <your-project-id>
, la posizione dell'ambiente Composer che hai creato in precedenza per <your-composer-location>
e il nome dell'ambiente Composer che hai creato in precedenza per <your-composer-environment>
python3 get_client_id.py <your-project-id> <your-composer-location> <your-composer-environment>
Ad esempio, se il nome del progetto è my-project
, la posizione di Composer è us-central1
e il nome dell'ambiente è my-composer
, il comando sarà
python3 get_client_id.py my-project us-central1 my-composer
get_client_id.py
effettua le seguenti operazioni:
- Autenticazione con Google Cloud
- Effettua una richiesta HTTP non autenticata al server web Airflow per ottenere l'URI di reindirizzamento
- Estrae il parametro di query
client_id
dal reindirizzamento - Lo stampa per consentirti di utilizzarlo
Il tuo ID client verrà stampato sulla riga di comando e avrà un aspetto simile a questo:
12345678987654321-abc1def3ghi5jkl7mno8pqr0.apps.googleusercontent.com
4. Crea la tua funzione
In Cloud Shell, clona il repository con il codice campione necessario eseguendo
cd git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git
Passa alla directory necessaria e lascia aperto Cloud Shell mentre completi i passaggi successivi
cd nodejs-docs-samples/composer/functions/composer-storage-trigger
Vai alla pagina Google Cloud Functions facendo clic sul menu di navigazione, quindi su "Cloud Functions" | |
Fai clic su "CREA FUNZIONE" nella parte superiore della pagina | |
Assegna alla funzione il nome "my-function" e lasciare il valore predefinito di 256 MB. | |
Imposta il trigger su "Cloud Storage", lascia il tipo di evento "Finalizza/Crea" e vai al bucket che hai creato nel passaggio Crea un bucket Cloud Storage. | |
Lascia il codice sorgente impostato su "Editor incorporato". e imposta il runtime su "Node.js 8" |
Esegui questo comando in Cloud Shell. Nell'editor di Cloud Shell verranno aperti index.js e package.json
cloudshell edit index.js package.json
Fai clic sulla scheda package.json, copia il codice e incollalo nella sezione package.json dell'editor in linea di Cloud Functions | |
Imposta "Function to Running" (Funzione su Esecuzione) per attivareDag | |
Fai clic sulla scheda index.js, copia il codice e incollalo nella sezione index.js dell'editor in linea di Cloud Functions | |
Sostituisci |
In Cloud Shell, esegui questo comando, sostituendo <nome-ambiente> con il nome del tuo ambiente Composer e <regione-tuo-composer> con la regione in cui si trova l'ambiente Composer.
gcloud composer environments describe <your-environment-name> --location <your-composer-region>
Ad esempio, se il tuo ambiente è denominato my-composer-environment
e si trova in us-central1
, il comando sarebbe
gcloud composer environments describe my-composer-environment --location us-central1
L'output dovrebbe essere simile al seguente:
config:
airflowUri: https://abc123efghi456k-tp.appspot.com
dagGcsPrefix: gs://narnia-north1-test-codelab-jklmno-bucket/dags
gkeCluster: projects/a-project/zones/narnia-north1-b/clusters/narnia-north1-test-codelab-jklmno-gke
nodeConfig:
diskSizeGb: 100
location: projects/a-project/zones/narnia-north1-b
machineType: projects/a-project/zones/narnia-north1-b/machineTypes/n1-standard-1
network: projects/a-project/global/networks/default
oauthScopes:
- https://www.googleapis.com/auth/cloud-platform
serviceAccount: 987665432-compute@developer.gserviceaccount.com
nodeCount: 3
softwareConfig:
imageVersion: composer-1.7.0-airflow-1.10.0
pythonVersion: '2'
createTime: '2019-05-29T09:41:27.919Z'
name: projects/a-project/locations/narnia-north1/environments/my-composer-environment
state: RUNNING
updateTime: '2019-05-29T09:56:29.969Z'
uuid: 123456-7890-9876-543-210123456
Nell'output, cerca la variabile denominata | |
Fai clic sul pulsante "Altro" seleziona l'area geograficamente più vicina a te | |
Seleziona "Riprova in caso di errore" | |
Fai clic su "Crea" per creare la funzione Cloud Functions |
Analisi del codice
Il codice copiato da index.js avrà un aspetto simile al seguente:
// [START composer_trigger]
'use strict';
const fetch = require('node-fetch');
const FormData = require('form-data');
/**
* Triggered from a message on a Cloud Storage bucket.
*
* IAP authorization based on:
* https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
* and
* https://cloud.google.com/iap/docs/authentication-howto
*
* @param {!Object} data The Cloud Functions event data.
* @returns {Promise}
*/
exports.triggerDag = async data => {
// Fill in your Composer environment information here.
// The project that holds your function
const PROJECT_ID = 'your-project-id';
// Navigate to your webserver's login page and get this from the URL
const CLIENT_ID = 'your-iap-client-id';
// This should be part of your webserver's URL:
// {tenant-project-id}.appspot.com
const WEBSERVER_ID = 'your-tenant-project-id';
// The name of the DAG you wish to trigger
const DAG_NAME = 'composer_sample_trigger_response_dag';
// Other constants
const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
const USER_AGENT = 'gcf-event-trigger';
const BODY = {conf: JSON.stringify(data)};
// Make the request
try {
const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);
return makeIapPostRequest(
WEBSERVER_URL,
BODY,
iap.idToken,
USER_AGENT,
iap.jwt
);
} catch (err) {
throw new Error(err);
}
};
/**
* @param {string} clientId The client id associated with the Composer webserver application.
* @param {string} projectId The id for the project containing the Cloud Function.
* @param {string} userAgent The user agent string which will be provided with the webserver request.
*/
const authorizeIap = async (clientId, projectId, userAgent) => {
const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
const JWT_HEADER = Buffer.from(
JSON.stringify({alg: 'RS256', typ: 'JWT'})
).toString('base64');
let jwt = '';
let jwtClaimset = '';
// Obtain an Oauth2 access token for the appspot service account
const res = await fetch(
`http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
{
headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
}
);
const tokenResponse = await res.json();
if (tokenResponse.error) {
return Promise.reject(tokenResponse.error);
}
const accessToken = tokenResponse.access_token;
const iat = Math.floor(new Date().getTime() / 1000);
const claims = {
iss: SERVICE_ACCOUNT,
aud: 'https://www.googleapis.com/oauth2/v4/token',
iat: iat,
exp: iat + 60,
target_audience: clientId,
};
jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
const toSign = [JWT_HEADER, jwtClaimset].join('.');
const blob = await fetch(
`https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
{
method: 'POST',
body: JSON.stringify({
bytesToSign: Buffer.from(toSign).toString('base64'),
}),
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${accessToken}`,
},
}
);
const blobJson = await blob.json();
if (blobJson.error) {
return Promise.reject(blobJson.error);
}
// Request service account signature on header and claimset
const jwtSignature = blobJson.signature;
jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
const form = new FormData();
form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
form.append('assertion', jwt);
const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
method: 'POST',
body: form,
});
const tokenJson = await token.json();
if (tokenJson.error) {
return Promise.reject(tokenJson.error);
}
return {
jwt: jwt,
idToken: tokenJson.id_token,
};
};
/**
* @param {string} url The url that the post request targets.
* @param {string} body The body of the post request.
* @param {string} idToken Bearer token used to authorize the iap request.
* @param {string} userAgent The user agent to identify the requester.
*/
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
const res = await fetch(url, {
method: 'POST',
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${idToken}`,
},
body: JSON.stringify(body),
});
if (!res.ok) {
const err = await res.text();
throw new Error(err);
}
};
// [END composer_trigger]
Diamo un'occhiata a cosa sta succedendo. Qui sono presenti tre funzioni: triggerDag
, authorizeIap
e makeIapPostRequest
triggerDag
è la funzione che viene attivata quando carichiamo qualcosa nel bucket Cloud Storage designato. È qui che configuriamo le variabili importanti utilizzate nelle altre richieste, come PROJECT_ID
, CLIENT_ID
, WEBSERVER_ID
e DAG_NAME
. Chiama authorizeIap
e makeIapPostRequest
.
exports.triggerDag = async data => {
// Fill in your Composer environment information here.
// The project that holds your function
const PROJECT_ID = 'your-project-id';
// Navigate to your webserver's login page and get this from the URL
const CLIENT_ID = 'your-iap-client-id';
// This should be part of your webserver's URL:
// {tenant-project-id}.appspot.com
const WEBSERVER_ID = 'your-tenant-project-id';
// The name of the DAG you wish to trigger
const DAG_NAME = 'composer_sample_trigger_response_dag';
// Other constants
const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
const USER_AGENT = 'gcf-event-trigger';
const BODY = {conf: JSON.stringify(data)};
// Make the request
try {
const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);
return makeIapPostRequest(
WEBSERVER_URL,
BODY,
iap.idToken,
USER_AGENT,
iap.jwt
);
} catch (err) {
throw new Error(err);
}
};
authorizeIap
effettua una richiesta al proxy che protegge il server web Airflow, utilizzando un account di servizio e "scambiando" un JWT per un token ID che verrà utilizzato per autenticare makeIapPostRequest
.
const authorizeIap = async (clientId, projectId, userAgent) => {
const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
const JWT_HEADER = Buffer.from(
JSON.stringify({alg: 'RS256', typ: 'JWT'})
).toString('base64');
let jwt = '';
let jwtClaimset = '';
// Obtain an Oauth2 access token for the appspot service account
const res = await fetch(
`http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
{
headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
}
);
const tokenResponse = await res.json();
if (tokenResponse.error) {
return Promise.reject(tokenResponse.error);
}
const accessToken = tokenResponse.access_token;
const iat = Math.floor(new Date().getTime() / 1000);
const claims = {
iss: SERVICE_ACCOUNT,
aud: 'https://www.googleapis.com/oauth2/v4/token',
iat: iat,
exp: iat + 60,
target_audience: clientId,
};
jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
const toSign = [JWT_HEADER, jwtClaimset].join('.');
const blob = await fetch(
`https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
{
method: 'POST',
body: JSON.stringify({
bytesToSign: Buffer.from(toSign).toString('base64'),
}),
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${accessToken}`,
},
}
);
const blobJson = await blob.json();
if (blobJson.error) {
return Promise.reject(blobJson.error);
}
// Request service account signature on header and claimset
const jwtSignature = blobJson.signature;
jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
const form = new FormData();
form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
form.append('assertion', jwt);
const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
method: 'POST',
body: form,
});
const tokenJson = await token.json();
if (tokenJson.error) {
return Promise.reject(tokenJson.error);
}
return {
jwt: jwt,
idToken: tokenJson.id_token,
};
};
makeIapPostRequest
effettua una chiamata al server web Airflow per attivare composer_sample_trigger_response_dag.
. Il nome del DAG è incorporato nell'URL del server web Airflow passato con il parametro url
e idToken
è il token che abbiamo ottenuto nella richiesta authorizeIap
.
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
const res = await fetch(url, {
method: 'POST',
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${idToken}`,
},
body: JSON.stringify(body),
});
if (!res.ok) {
const err = await res.text();
throw new Error(err);
}
};
5. Configura il tuo DAG
In Cloud Shell, passa alla directory con i flussi di lavoro di esempio. Fa parte del file python-docs-samples scaricato da GitHub nel passaggio relativo al recupero dell'ID client.
cd cd python-docs-samples/composer/workflows
Carica il DAG in Composer
Carica il DAG di esempio nel bucket di archiviazione DAG del tuo ambiente Composer con il seguente comando, dove <environment_name>
è il nome del tuo ambiente Composer e <location>
è il nome della regione in cui si trova. trigger_response_dag.py
è il DAG con cui lavoreremo.
gcloud composer environments storage dags import \ --environment <environment_name> \ --location <location> \ --source trigger_response_dag.py
Ad esempio, se il tuo ambiente Composer fosse denominato my-composer
e si trovasse in us-central1
, il comando sarebbe
gcloud composer environments storage dags import \ --environment my-composer \ --location us-central1 \ --source trigger_response_dag.py
Attraverso il DAG
Il codice DAG in trigger_response.py
ha questo aspetto
import datetime
import airflow
from airflow.operators import bash_operator
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': datetime.datetime(2017, 1, 1),
}
with airflow.DAG(
'composer_sample_trigger_response_dag',
default_args=default_args,
# Not scheduled, trigger only
schedule_interval=None) as dag:
# Print the dag_run's configuration, which includes information about the
# Cloud Storage object change.
print_gcs_info = bash_operator.BashOperator(
task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')
La sezione default_args
contiene gli argomenti predefiniti richiesti dal modello BaseOperator in Apache Airflow. Questa sezione viene visualizzata con questi parametri in qualsiasi DAG Apache Airflow. owner
è attualmente impostato su Composer Example
, ma puoi modificarlo in modo che sia il tuo nome, se vuoi. depends_on_past
mostra che questo DAG non dipende da nessun DAG precedente. Le tre sezioni delle email email
, email_on_failure
e email_on_retry
sono impostate in modo che non arrivino notifiche via email in base allo stato di questo DAG. Il DAG eseguirà un solo nuovo tentativo poiché retries
è impostato su 1 e lo farà dopo cinque minuti, per retry_delay
. start_date
normalmente determina quando deve essere eseguito un DAG, insieme al suo schedule_interval
(impostato in seguito), ma nel caso di questo DAG non è pertinente. È impostata sul 1° gennaio 2017, ma potrebbe essere impostata su una data passata.
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': datetime.datetime(2017, 1, 1),
}
La sezione with airflow.DAG
configura il DAG che verrà eseguito. Verrà eseguita con l'ID attività composer_sample_trigger_response_dag
, gli argomenti predefiniti della sezione default_args
e, soprattutto, con un valore schedule_interval
di None
. schedule_interval
è impostato su None
perché stiamo attivando questo particolare DAG con la nostra funzione Cloud Functions. Per questo motivo, l'elemento start_date
in default_args
non è pertinente.
Quando viene eseguito, il DAG stampa la sua configurazione, come indicato nella variabile print_gcs_info
.
with airflow.DAG(
'composer_sample_trigger_response_dag',
default_args=default_args,
# Not scheduled, trigger only
schedule_interval=None) as dag:
# Print the dag_run's configuration, which includes information about the
# Cloud Storage object change.
print_gcs_info = bash_operator.BashOperator(
task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')
6. Testare la funzione
Apri l'ambiente Composer e nella riga con il nome dell'ambiente, fai clic sul link Airflow. | |
Apri | |
Apri una scheda separata e carica qualsiasi file nel bucket Cloud Storage che hai creato in precedenza e specificato come trigger per la funzione Cloud Functions. Per farlo, puoi utilizzare la console o un comando gsutil. | |
Torna alla scheda con la UI di Airflow e fai clic su Visualizzazione grafico | |
Fai clic sull'attività | |
Fai clic su "Visualizza log" in alto a destra nel menu | |
Nei log vedrai le informazioni sul file che hai caricato nel bucket Cloud Storage. |
Complimenti! Hai appena attivato un DAG Airflow utilizzando Node.js e Google Cloud Functions.
7. Esegui la pulizia
Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questa guida rapida:
- (Facoltativo) Per salvare i dati, scarica i dati dal bucket Cloud Storage per l'ambiente Cloud Composer e dal bucket di archiviazione che hai creato per questa guida rapida.
- Elimina il bucket Cloud Storage relativo all'ambiente e che hai creato
- Eliminare l'ambiente Cloud Composer. Tieni presente che l'eliminazione dell'ambiente non comporta l'eliminazione del bucket di archiviazione per l'ambiente.
- (Facoltativo) Con il serverless computing, i primi 2 milioni di chiamate al mese sono senza costi e quando scala la funzione fino a zero non ti viene addebitato alcun costo (consulta i prezzi per maggiori dettagli). Se invece vuoi eliminare la funzione Cloud Functions, fai clic su "ELIMINA" in alto a destra nella pagina Panoramica della funzione
Se vuoi, puoi anche eliminare il progetto:
- Nella console di Google Cloud, vai alla pagina Progetti.
- Nell'elenco dei progetti, seleziona il progetto che vuoi eliminare e fai clic su Elimina.
- Nella casella, digita l'ID progetto, quindi fai clic su Chiudi per eliminare il progetto.