Attivazione di un DAG con Node.JS e Google Cloud Functions

1. Introduzione

Apache Airflow è progettato per eseguire i DAG su una pianificazione regolare, ma puoi anche attivare i DAG in risposta a eventi, come una modifica in un bucket Cloud Storage o un push di un messaggio a Cloud Pub/Sub. A questo scopo, i DAG di Cloud Composer possono essere attivati da Cloud Functions.

L'esempio in questo lab esegue un semplice DAG ogni volta che si verifica una modifica in un bucket Cloud Storage. Questo DAG utilizza BashOperator per eseguire un comando bash che stampa le informazioni sulle modifiche relative a ciò che è stato caricato nel bucket Cloud Storage.

Prima di iniziare questo lab, è consigliabile completare i codelab Introduzione a Cloud Composer e Getting Started with Cloud Functions. Se crei un ambiente Composer nel codelab introduzione a Cloud Composer, puoi utilizzare questo ambiente in questo lab.

Cosa creerai

In questo codelab, imparerai a:

  1. Carica un file su Google Cloud Storage, che
  2. Attiva una funzione Google Cloud Functions utilizzando il runtime Node.JS
  3. Questa funzione eseguirà un DAG in Google Cloud Composer
  4. che esegue un semplice comando bash che stampa la modifica al bucket Google Cloud Storage

1d3d3736624a923f.png

Obiettivi didattici

  • Come attivare un DAG Apache Airflow utilizzando Google Cloud Functions + Node.js

Che cosa ti serve

  • Account Google Cloud
  • Conoscenza di base di JavaScript
  • Conoscenza di base di Cloud Composer/Airflow e Cloud Functions
  • Comfort con i comandi dell'interfaccia a riga di comando

2. Configurazione di Google Cloud

Seleziona o crea il progetto

Seleziona o crea un progetto della piattaforma Google Cloud. Se stai creando un nuovo progetto, segui i passaggi indicati qui.

Prendi nota dell'ID progetto, che utilizzerai nei passaggi successivi.

Se stai creando un nuovo progetto, l'ID progetto si trova subito sotto il nome del progetto nella pagina di creazione

Se hai già creato un progetto, puoi trovare l'ID nella home page della console nella scheda Informazioni sul progetto

Abilita le API

Abilita le API Cloud Composer, Google Cloud Functions e Cloud Identity e Google Identity and Access Management (IAM).

Crea ambiente Composer

Crea un ambiente Cloud Composer con la seguente configurazione:

  • Nome: ambiente-mio-composer
  • Posizione: qualsiasi località geograficamente più vicina a te
  • Zona: qualsiasi zona della regione

Tutte le altre configurazioni possono rimanere su quelle predefinite. Fai clic su "Crea" in basso.Prendi nota del nome e della posizione del tuo ambiente Composer: ti serviranno nei passaggi futuri.

Crea un bucket Cloud Storage

Nel tuo progetto, crea un bucket Cloud Storage con la seguente configurazione:

  • Nome: <your-project-id>
  • Classe di archiviazione predefinita: multiregionale
  • Posizione: qualunque località sia geograficamente più vicina alla regione di Cloud Composer che stai utilizzando
  • Modello di controllo degli accessi: imposta autorizzazioni a livello di oggetto e di bucket

Premi "Crea" quando è tutto prontoAssicurati di prendere nota del nome del tuo bucket Cloud Storage per i passaggi successivi.

3. Configurazione di Google Cloud Functions (GCF)

Per configurare GCF, eseguiremo i comandi in Google Cloud Shell.

Google Cloud può essere gestito da remoto dal tuo laptop utilizzando lo strumento a riga di comando gcloud, in questo codelab utilizzeremo Google Cloud Shell, un ambiente a riga di comando in esecuzione nel cloud.

Questa macchina virtuale basata su Debian viene caricata con tutti gli strumenti di sviluppo necessari. Offre una home directory permanente da 5 GB e viene eseguita su Google Cloud, migliorando notevolmente le prestazioni di rete e l'autenticazione. Ciò significa che per questo codelab è sufficiente un browser (sì, funziona su Chromebook).

Per attivare Google Cloud Shell, dalla console per gli sviluppatori fai clic sul pulsante in alto a destra (il provisioning e la connessione all'ambiente dovrebbero richiedere solo qualche istante):

Concedi le autorizzazioni di firma del blob all'account di servizio Cloud Functions

Per consentire a GCF di eseguire l'autenticazione su Cloud IAP, il proxy che protegge il server web Airflow, devi concedere al GCF dell'account di servizio Appspot il ruolo Service Account Token Creator. Per farlo, esegui questo comando in Cloud Shell, sostituendo il nome del progetto con <your-project-id>.

gcloud iam service-accounts add-iam-policy-binding \
<your-project-id>@appspot.gserviceaccount.com \
--member=serviceAccount:<your-project-id>@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

Ad esempio, se il tuo progetto si chiama my-project, il comando sarebbe

gcloud iam service-accounts add-iam-policy-binding \
my-project@appspot.gserviceaccount.com \
--member=serviceAccount:my-project@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

Ottenere l'ID client

Per creare un token per eseguire l'autenticazione in Cloud IAP, la funzione richiede l'ID client del proxy che protegge il server web Airflow. L'API Cloud Composer non fornisce direttamente queste informazioni. Effettua invece una richiesta non autenticata al server web Airflow e acquisisci l'ID client dall'URL di reindirizzamento. Per farlo, eseguiamo un file Python utilizzando Cloud Shell per acquisire l'ID client.

Scarica il codice necessario da GitHub eseguendo il comando seguente in Cloud Shell

cd
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git

Se hai ricevuto un errore perché la directory esiste già, aggiornala alla versione più recente eseguendo questo comando

cd python-docs-samples/
git pull origin master

Passa alla directory appropriata eseguendo

cd python-docs-samples/composer/rest

Esegui il codice Python per ottenere il tuo ID client, sostituendo il nome del progetto per <your-project-id>, la posizione dell'ambiente Composer che hai creato in precedenza per <your-composer-location> e il nome dell'ambiente Composer che hai creato in precedenza per <your-composer-environment>

python3 get_client_id.py <your-project-id> <your-composer-location> <your-composer-environment>

Ad esempio, se il nome del progetto è my-project, la posizione di Composer è us-central1 e il nome dell'ambiente è my-composer, il comando sarà

python3 get_client_id.py my-project us-central1 my-composer

get_client_id.py effettua le seguenti operazioni:

  • Autenticazione con Google Cloud
  • Effettua una richiesta HTTP non autenticata al server web Airflow per ottenere l'URI di reindirizzamento
  • Estrae il parametro di query client_id dal reindirizzamento
  • Lo stampa per consentirti di utilizzarlo

Il tuo ID client verrà stampato sulla riga di comando e avrà un aspetto simile a questo:

12345678987654321-abc1def3ghi5jkl7mno8pqr0.apps.googleusercontent.com

4. Crea la tua funzione

In Cloud Shell, clona il repository con il codice campione necessario eseguendo

cd
git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git

Passa alla directory necessaria e lascia aperto Cloud Shell mentre completi i passaggi successivi

cd nodejs-docs-samples/composer/functions/composer-storage-trigger

Vai alla pagina Google Cloud Functions facendo clic sul menu di navigazione, quindi su "Cloud Functions"

Fai clic su "CREA FUNZIONE" nella parte superiore della pagina

Assegna alla funzione il nome "my-function" e lasciare il valore predefinito di 256 MB.

Imposta il trigger su "Cloud Storage", lascia il tipo di evento "Finalizza/Crea" e vai al bucket che hai creato nel passaggio Crea un bucket Cloud Storage.

Lascia il codice sorgente impostato su "Editor incorporato". e imposta il runtime su "Node.js 8"

Esegui questo comando in Cloud Shell. Nell'editor di Cloud Shell verranno aperti index.js e package.json

cloudshell edit index.js package.json

Fai clic sulla scheda package.json, copia il codice e incollalo nella sezione package.json dell'editor in linea di Cloud Functions

Imposta "Function to Running" (Funzione su Esecuzione) per attivareDag

Fai clic sulla scheda index.js, copia il codice e incollalo nella sezione index.js dell'editor in linea di Cloud Functions

Sostituisci PROJECT_ID con il tuo ID progetto e CLIENT_ID con l'ID client che hai salvato nel passaggio Recupero dell'ID client. NON fare clic su "Crea" Ci sono ancora alcune cose da colmare.

In Cloud Shell, esegui questo comando, sostituendo <nome-ambiente> con il nome del tuo ambiente Composer e <regione-tuo-composer> con la regione in cui si trova l'ambiente Composer.

gcloud composer environments describe <your-environment-name> --location <your-composer-region>

Ad esempio, se il tuo ambiente è denominato my-composer-environment e si trova in us-central1, il comando sarebbe

gcloud composer environments describe my-composer-environment --location us-central1

L'output dovrebbe essere simile al seguente:

config:
 airflowUri: https://abc123efghi456k-tp.appspot.com
 dagGcsPrefix: gs://narnia-north1-test-codelab-jklmno-bucket/dags
 gkeCluster: projects/a-project/zones/narnia-north1-b/clusters/narnia-north1-test-codelab-jklmno-gke
 nodeConfig:
   diskSizeGb: 100
   location: projects/a-project/zones/narnia-north1-b
   machineType: projects/a-project/zones/narnia-north1-b/machineTypes/n1-standard-1
   network: projects/a-project/global/networks/default
   oauthScopes:
   - https://www.googleapis.com/auth/cloud-platform
   serviceAccount: 987665432-compute@developer.gserviceaccount.com
 nodeCount: 3
 softwareConfig:
   imageVersion: composer-1.7.0-airflow-1.10.0
   pythonVersion: '2'
createTime: '2019-05-29T09:41:27.919Z'
name: projects/a-project/locations/narnia-north1/environments/my-composer-environment
state: RUNNING
updateTime: '2019-05-29T09:56:29.969Z'
uuid: 123456-7890-9876-543-210123456

Nell'output, cerca la variabile denominata airflowUri. Nel codice index.js, cambia WEBSERVER_ID in modo che corrisponda all'ID del server web Airflow. È la parte della variabile airflowUri che avrà "-tp" alla fine, ad esempio abc123efghi456k-tp

Fai clic sul pulsante "Altro" seleziona l'area geograficamente più vicina a te

Seleziona "Riprova in caso di errore"

Fai clic su "Crea" per creare la funzione Cloud Functions

Analisi del codice

Il codice copiato da index.js avrà un aspetto simile al seguente:

// [START composer_trigger]
'use strict';

const fetch = require('node-fetch');
const FormData = require('form-data');

/**
 * Triggered from a message on a Cloud Storage bucket.
 *
 * IAP authorization based on:
 * https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
 * and
 * https://cloud.google.com/iap/docs/authentication-howto
 *
 * @param {!Object} data The Cloud Functions event data.
 * @returns {Promise}
 */
exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

/**
 * @param {string} clientId The client id associated with the Composer webserver application.
 * @param {string} projectId The id for the project containing the Cloud Function.
 * @param {string} userAgent The user agent string which will be provided with the webserver request.
 */
const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

/**
 * @param {string} url The url that the post request targets.
 * @param {string} body The body of the post request.
 * @param {string} idToken Bearer token used to authorize the iap request.
 * @param {string} userAgent The user agent to identify the requester.
 */
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};
// [END composer_trigger]

Diamo un'occhiata a cosa sta succedendo. Qui sono presenti tre funzioni: triggerDag, authorizeIap e makeIapPostRequest

triggerDag è la funzione che viene attivata quando carichiamo qualcosa nel bucket Cloud Storage designato. È qui che configuriamo le variabili importanti utilizzate nelle altre richieste, come PROJECT_ID, CLIENT_ID, WEBSERVER_ID e DAG_NAME. Chiama authorizeIap e makeIapPostRequest.

exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

authorizeIap effettua una richiesta al proxy che protegge il server web Airflow, utilizzando un account di servizio e "scambiando" un JWT per un token ID che verrà utilizzato per autenticare makeIapPostRequest.

const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

makeIapPostRequest effettua una chiamata al server web Airflow per attivare composer_sample_trigger_response_dag.. Il nome del DAG è incorporato nell'URL del server web Airflow passato con il parametro url e idToken è il token che abbiamo ottenuto nella richiesta authorizeIap.

const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};

5. Configura il tuo DAG

In Cloud Shell, passa alla directory con i flussi di lavoro di esempio. Fa parte del file python-docs-samples scaricato da GitHub nel passaggio relativo al recupero dell'ID client.

cd
cd python-docs-samples/composer/workflows

Carica il DAG in Composer

Carica il DAG di esempio nel bucket di archiviazione DAG del tuo ambiente Composer con il seguente comando, dove <environment_name> è il nome del tuo ambiente Composer e <location> è il nome della regione in cui si trova. trigger_response_dag.py è il DAG con cui lavoreremo.

gcloud composer environments storage dags import \
    --environment <environment_name> \
    --location <location> \
    --source trigger_response_dag.py

Ad esempio, se il tuo ambiente Composer fosse denominato my-composer e si trovasse in us-central1, il comando sarebbe

gcloud composer environments storage dags import \
    --environment my-composer \
    --location us-central1 \
    --source trigger_response_dag.py

Attraverso il DAG

Il codice DAG in trigger_response.py ha questo aspetto

import datetime
import airflow
from airflow.operators import bash_operator


default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

La sezione default_args contiene gli argomenti predefiniti richiesti dal modello BaseOperator in Apache Airflow. Questa sezione viene visualizzata con questi parametri in qualsiasi DAG Apache Airflow. owner è attualmente impostato su Composer Example, ma puoi modificarlo in modo che sia il tuo nome, se vuoi. depends_on_past mostra che questo DAG non dipende da nessun DAG precedente. Le tre sezioni delle email email, email_on_failure e email_on_retry sono impostate in modo che non arrivino notifiche via email in base allo stato di questo DAG. Il DAG eseguirà un solo nuovo tentativo poiché retries è impostato su 1 e lo farà dopo cinque minuti, per retry_delay. start_date normalmente determina quando deve essere eseguito un DAG, insieme al suo schedule_interval (impostato in seguito), ma nel caso di questo DAG non è pertinente. È impostata sul 1° gennaio 2017, ma potrebbe essere impostata su una data passata.

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

La sezione with airflow.DAG configura il DAG che verrà eseguito. Verrà eseguita con l'ID attività composer_sample_trigger_response_dag, gli argomenti predefiniti della sezione default_args e, soprattutto, con un valore schedule_interval di None. schedule_interval è impostato su None perché stiamo attivando questo particolare DAG con la nostra funzione Cloud Functions. Per questo motivo, l'elemento start_date in default_args non è pertinente.

Quando viene eseguito, il DAG stampa la sua configurazione, come indicato nella variabile print_gcs_info.

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

6. Testare la funzione

Apri l'ambiente Composer e nella riga con il nome dell'ambiente, fai clic sul link Airflow.

Apri composer_sample_trigger_response_dag facendo clic sul suo nome. Al momento non ci sono prove delle esecuzioni dei DAG, perché non abbiamo ancora attivato l'esecuzione del DAG.Se questo DAG non è visibile o cliccabile, attendi un minuto e aggiorna la pagina.

Apri una scheda separata e carica qualsiasi file nel bucket Cloud Storage che hai creato in precedenza e specificato come trigger per la funzione Cloud Functions. Per farlo, puoi utilizzare la console o un comando gsutil.

Torna alla scheda con la UI di Airflow e fai clic su Visualizzazione grafico

Fai clic sull'attività print_gcs_info, che dovrebbe contornare in verde

Fai clic su "Visualizza log" in alto a destra nel menu

Nei log vedrai le informazioni sul file che hai caricato nel bucket Cloud Storage.

Complimenti! Hai appena attivato un DAG Airflow utilizzando Node.js e Google Cloud Functions.

7. Esegui la pulizia

Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questa guida rapida:

  1. (Facoltativo) Per salvare i dati, scarica i dati dal bucket Cloud Storage per l'ambiente Cloud Composer e dal bucket di archiviazione che hai creato per questa guida rapida.
  2. Elimina il bucket Cloud Storage relativo all'ambiente e che hai creato
  3. Eliminare l'ambiente Cloud Composer. Tieni presente che l'eliminazione dell'ambiente non comporta l'eliminazione del bucket di archiviazione per l'ambiente.
  4. (Facoltativo) Con il serverless computing, i primi 2 milioni di chiamate al mese sono senza costi e quando scala la funzione fino a zero non ti viene addebitato alcun costo (consulta i prezzi per maggiori dettagli). Se invece vuoi eliminare la funzione Cloud Functions, fai clic su "ELIMINA" in alto a destra nella pagina Panoramica della funzione

4fe11e1b41b32ba2.png

Se vuoi, puoi anche eliminare il progetto:

  1. Nella console di Google Cloud, vai alla pagina Progetti.
  2. Nell'elenco dei progetti, seleziona il progetto che vuoi eliminare e fai clic su Elimina.
  3. Nella casella, digita l'ID progetto, quindi fai clic su Chiudi per eliminare il progetto.