Activa un DAG con Node.JS y Google Cloud Functions

1. Introducción

Apache Airflow está diseñado para ejecutar DAG de forma periódica, pero también puedes activar DAG en respuesta a eventos, como un cambio en un bucket de Cloud Storage o un mensaje enviado a Cloud Pub/Sub. Para lograr esto, Cloud Functions puede activar los DAG de Cloud Composer.

En el ejemplo de este lab, se ejecuta un DAG simple cada vez que se produce un cambio en un bucket de Cloud Storage. Este DAG usa BashOperator para ejecutar un comando Bash que imprime la información de cambio sobre lo que se subió al bucket de Cloud Storage.

Antes de comenzar este lab, se recomienda completar los codelabs Introducción a Cloud Composer y Comienza a usar Cloud Functions. Si creas un entorno de Composer en el codelab Introducción a Cloud Composer, puedes usarlo en este lab.

Qué compilarás

En este codelab, harás lo siguiente:

  1. Sube un archivo a Google Cloud Storage, que hará lo siguiente:
  2. Activa una función de Google Cloud con el entorno de ejecución de Node.JS
  3. Esta función ejecutará un DAG en Google Cloud Composer
  4. El comando ejecuta un comando Bash simple para imprimir el cambio en el bucket de Google Cloud Storage.

1d3d3736624a923f.png

Lo que aprenderá

  • Cómo activar un DAG de Apache Airflow con Google Cloud Functions y Node.js

Requisitos

  • Cuenta de GCP
  • Conocimientos básicos sobre JavaScript
  • Conocimientos básicos de Cloud Composer/Airflow y Cloud Functions
  • Comodidad con los comandos de la CLI

2. Configura GCP

Selecciona o crea el proyecto

Selecciona o crea un proyecto de Google Cloud Platform. Si estás creando un proyecto nuevo, sigue los pasos que se encuentran aquí.

Toma nota del ID del proyecto, ya que lo usarás en pasos posteriores.

Si vas a crear un proyecto nuevo, el ID del proyecto se encuentra justo debajo del nombre del proyecto en la página de creación.

Si ya creaste un proyecto, puedes encontrar el ID en la página principal de la consola, en la tarjeta de información del proyecto.

Habilitación de las API

Habilitar Cloud Composer, Google Cloud Functions, Cloud Identity y la API de Google Identity and Access Management (IAM).

Crea un entorno de Composer

Crea un entorno de Cloud Composer con la siguiente configuración:

  • Nombre: my-composer-environment
  • Ubicación: la ubicación más cercana a ti geográficamente
  • Zona: cualquier zona en esa región

Todos los demás parámetros de configuración pueden permanecer con sus valores predeterminados. Haz clic en "Crear". en la parte inferior.Anota el nombre y la ubicación de tu entorno de Composer, ya que los necesitarás en pasos futuros.

Crea un bucket de Cloud Storage

En tu proyecto, crea un bucket de Cloud Storage con la siguiente configuración:

  • Nombre: <your-project-id>
  • Clase de almacenamiento predeterminada: Multi-regional
  • Ubicación: Cualquiera que sea la ubicación geográfica más cercana a la región de Cloud Composer que estás usando
  • Modelo de control de acceso: Establece permisos a nivel de objeto y a nivel de bucket

Presiona "Crear" Asegúrate de anotar el nombre de tu bucket de Cloud Storage para los pasos posteriores.

3. Configura Google Cloud Functions (GCF)

Para configurar GCF, ejecutaremos comandos en Google Cloud Shell.

Si bien Google Cloud se puede operar de manera remota desde tu laptop con la herramienta de línea de comandos de gcloud, en este codelab usaremos Google Cloud Shell, un entorno de línea de comandos que se ejecuta en la nube.

Esta máquina virtual basada en Debian está cargada con todas las herramientas de desarrollo que necesitarás. Ofrece un directorio principal persistente de 5 GB y se ejecuta en Google Cloud, lo que permite mejorar considerablemente el rendimiento de la red y la autenticación. Esto significa que todo lo que necesitarás para este Codelab es un navegador (sí, funciona en una Chromebook).

Para activar Google Cloud Shell, en la consola para desarrolladores, haz clic en el botón que se encuentra en la parte superior derecha (el aprovisionamiento y la conexión al entorno solo debería tardar unos minutos):

Otorga permisos de firma de BLOB a la cuenta de servicio de Cloud Functions

Para que GCF se autentique en Cloud IAP, el proxy que protege el servidor web de Airflow, debes otorgar el rol Service Account Token Creator al GCF de la cuenta de servicio de Appspot. Para ello, ejecuta el siguiente comando en Cloud Shell y sustituye el nombre de tu proyecto por <your-project-id>.

gcloud iam service-accounts add-iam-policy-binding \
<your-project-id>@appspot.gserviceaccount.com \
--member=serviceAccount:<your-project-id>@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

Por ejemplo, si tu proyecto se llama my-project, tu comando será

gcloud iam service-accounts add-iam-policy-binding \
my-project@appspot.gserviceaccount.com \
--member=serviceAccount:my-project@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

Cómo obtener el ID de cliente

Para construir un token que se autentique en Cloud IAP, la función requiere el ID de cliente del proxy que protege el servidor web de Airflow. La API de Cloud Composer no proporciona esta información directamente. En su lugar, realiza una solicitud no autenticada al servidor web de Airflow y captura el ID de cliente desde la URL de redireccionamiento. Para hacerlo, ejecutaremos un archivo de Python con Cloud Shell para capturar el ID de cliente.

Descarga el código necesario desde GitHub ejecutando el siguiente comando en Cloud Shell

cd
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git

Si recibiste un mensaje de error porque este directorio ya existe, ejecuta el siguiente comando para actualizarlo a la versión más reciente

cd python-docs-samples/
git pull origin master

Cambia al directorio adecuado ejecutando

cd python-docs-samples/composer/rest

Ejecuta el código de Python para obtener tu ID de cliente. Para ello, sustituye el nombre de tu proyecto por <your-project-id>, la ubicación del entorno de Composer que creaste anteriormente para <your-composer-location> y el nombre del entorno de Composer que creaste anteriormente para <your-composer-environment>

python3 get_client_id.py <your-project-id> <your-composer-location> <your-composer-environment>

Por ejemplo, si el nombre de tu proyecto es my-project, tu ubicación de Composer es us-central1 y el nombre de tu entorno es my-composer, tu comando sería

python3 get_client_id.py my-project us-central1 my-composer

get_client_id.py realiza las siguientes acciones:

  • Se autentica con Google Cloud
  • Realiza una solicitud HTTP no autenticada al servidor web de Airflow para obtener el URI de redireccionamiento.
  • Extrae el parámetro de consulta client_id de ese redireccionamiento.
  • Lo imprime para que lo use

Tu ID de cliente se imprimirá en la línea de comandos y será similar a esto:

12345678987654321-abc1def3ghi5jkl7mno8pqr0.apps.googleusercontent.com

4. Crea tu función

En Cloud Shell, ejecuta el comando para clonar el repo con el código de muestra necesario.

cd
git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git

Cambia al directorio necesario y deja Cloud Shell abierto mientras completas los siguientes pasos.

cd nodejs-docs-samples/composer/functions/composer-storage-trigger

Haz clic en el menú de navegación y, luego, en “Cloud Functions” para navegar a la página de Google Cloud Functions.

Haz clic en “CREAR FUNCIÓN”. en la parte superior de la página

Asígnale el nombre “my-function” a la función. y deja la memoria predeterminada, 256 MB.

Establece el activador en “Cloud Storage”, deja el tipo de evento como “Finalize/Create” y navega hasta el bucket que creaste en el paso Crea un bucket de Cloud Storage.

Deja el código fuente configurado en "Inline Editor". y estableceré el entorno de ejecución en “Node.js 8”.

En Cloud Shell, ejecuta el siguiente comando. Esto abrirá index.js y package.json en el editor de Cloud Shell

cloudshell edit index.js package.json

Haz clic en la pestaña package.json, copia el código y pégalo en la sección package.json del editor directo de Cloud Functions

Establecer la función que se ejecutará para triggerDag.

Haz clic en la pestaña index.js, copia el código y pégalo en la sección index.js del editor directo de Cloud Functions

Cambia el PROJECT_ID por el ID de tu proyecto y el CLIENT_ID por el ID de cliente que guardaste en el paso Cómo obtener el ID de cliente. NO hagas clic en “Crear”. sin embargo, todavía hay algunas cosas más por completar.

En Cloud Shell, ejecuta el siguiente comando y reemplaza <your-environment-name> por el nombre de tu entorno de Composer y <your-composer-region> con la región en la que se encuentra tu entorno de Composer.

gcloud composer environments describe <your-environment-name> --location <your-composer-region>

Por ejemplo, si tu entorno se llama my-composer-environment y se encuentra en us-central1, tu comando será

gcloud composer environments describe my-composer-environment --location us-central1

El resultado debería ser similar a este:

config:
 airflowUri: https://abc123efghi456k-tp.appspot.com
 dagGcsPrefix: gs://narnia-north1-test-codelab-jklmno-bucket/dags
 gkeCluster: projects/a-project/zones/narnia-north1-b/clusters/narnia-north1-test-codelab-jklmno-gke
 nodeConfig:
   diskSizeGb: 100
   location: projects/a-project/zones/narnia-north1-b
   machineType: projects/a-project/zones/narnia-north1-b/machineTypes/n1-standard-1
   network: projects/a-project/global/networks/default
   oauthScopes:
   - https://www.googleapis.com/auth/cloud-platform
   serviceAccount: 987665432-compute@developer.gserviceaccount.com
 nodeCount: 3
 softwareConfig:
   imageVersion: composer-1.7.0-airflow-1.10.0
   pythonVersion: '2'
createTime: '2019-05-29T09:41:27.919Z'
name: projects/a-project/locations/narnia-north1/environments/my-composer-environment
state: RUNNING
updateTime: '2019-05-29T09:56:29.969Z'
uuid: 123456-7890-9876-543-210123456

En ese resultado, busca la variable llamada airflowUri. En tu código de index.js, cambia WEBSERVER_ID por el ID del servidor web de Airflow; es la parte de la variable airflowUri que tendrá “-tp”. al final, por ejemplo, abc123efghi456k-tp.

Haz clic en el botón "Más" vínculo desplegable y, luego, elige la Región más cercana geográficamente

Verifica "Reintentar en caso de error"

Haz clic en "Crear". para crear tu Cloud Function

Recorre el código

El código que copiaste de index.js se verá de la siguiente manera:

// [START composer_trigger]
'use strict';

const fetch = require('node-fetch');
const FormData = require('form-data');

/**
 * Triggered from a message on a Cloud Storage bucket.
 *
 * IAP authorization based on:
 * https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
 * and
 * https://cloud.google.com/iap/docs/authentication-howto
 *
 * @param {!Object} data The Cloud Functions event data.
 * @returns {Promise}
 */
exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

/**
 * @param {string} clientId The client id associated with the Composer webserver application.
 * @param {string} projectId The id for the project containing the Cloud Function.
 * @param {string} userAgent The user agent string which will be provided with the webserver request.
 */
const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

/**
 * @param {string} url The url that the post request targets.
 * @param {string} body The body of the post request.
 * @param {string} idToken Bearer token used to authorize the iap request.
 * @param {string} userAgent The user agent to identify the requester.
 */
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};
// [END composer_trigger]

Veamos qué sucede. Aquí hay tres funciones: triggerDag, authorizeIap y makeIapPostRequest

triggerDag es la función que se activa cuando subimos algo al bucket de Cloud Storage designado. Aquí es donde se configuran variables importantes que se usan en las otras solicitudes, como PROJECT_ID, CLIENT_ID, WEBSERVER_ID y DAG_NAME. Llama a authorizeIap y makeIapPostRequest.

exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

authorizeIap realiza una solicitud al proxy que protege el servidor web de Airflow mediante una cuenta de servicio y el "intercambio" un JWT para un token de ID que se usará para autenticar el makeIapPostRequest

const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

makeIapPostRequest realiza una llamada al servidor web de Airflow para activar composer_sample_trigger_response_dag.. El nombre del DAG está incorporado en la URL del servidor web de Airflow que se pasó con el parámetro url, y idToken es el token que obtuvimos en la solicitud authorizeIap.

const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};

5. Configura tu DAG

En Cloud Shell, cambia al directorio con los flujos de trabajo de muestra. Es parte de python-docs-samples que descargaste de GitHub en el paso Cómo obtener el ID de cliente.

cd
cd python-docs-samples/composer/workflows

Sube el DAG a Composer

Sube el DAG de muestra al bucket de almacenamiento de DAG del entorno de Composer con el siguiente comando, en el que <environment_name> es el nombre de tu entorno de Composer y <location> es el nombre de la región en la que se encuentra. trigger_response_dag.py es el DAG con el que trabajaremos.

gcloud composer environments storage dags import \
    --environment <environment_name> \
    --location <location> \
    --source trigger_response_dag.py

Por ejemplo, si tu entorno de Composer se llama my-composer y se encuentra en us-central1, tu comando será

gcloud composer environments storage dags import \
    --environment my-composer \
    --location us-central1 \
    --source trigger_response_dag.py

Explicación del DAG

El código del DAG en trigger_response.py se ve de la siguiente manera:

import datetime
import airflow
from airflow.operators import bash_operator


default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

La sección default_args contiene los argumentos predeterminados que requiere el modelo BaseOperator en Apache Airflow. Verías esta sección con estos parámetros en cualquier DAG de Apache Airflow. Actualmente, owner está configurado como Composer Example, pero puedes cambiarlo para que sea tu nombre si lo deseas. depends_on_past nos muestra que este DAG no depende de ningún DAG anterior. Las tres secciones de correo electrónico, email, email_on_failure y email_on_retry, están configuradas para que no lleguen notificaciones por correo electrónico según el estado de este DAG. El DAG solo volverá a intentarlo una vez, ya que retries se establece en 1, y lo hará después de cinco minutos, por retry_delay. Por lo general, start_date determina cuándo se debe ejecutar un DAG, junto con su schedule_interval (configurado más adelante), pero, en el caso de este DAG, no es relevante. Si bien se estableció en el 1 de enero de 2017, se puede establecer en cualquier fecha pasada.

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

En la sección with airflow.DAG, se configura el DAG que se ejecutará. Se ejecutará con el ID de tarea composer_sample_trigger_response_dag, los argumentos predeterminados de la sección default_args y, lo más importante, con un schedule_interval de None. schedule_interval se establece en None porque estamos activando este DAG en particular con nuestra Cloud Function. Es por eso que start_date en default_args no es relevante.

Cuando se ejecuta, el DAG imprime su configuración, como se indica en la variable print_gcs_info.

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

6. Prueba tu función

Abre tu entorno de Composer y, en la fila con el nombre de tu entorno, haz clic en el vínculo de Airflow.

Haz clic en el nombre de la composer_sample_trigger_response_dag para abrirla. Por el momento, no habrá evidencia de ejecuciones del DAG, ya que aún no activamos su ejecución.Si no se puede ver este DAG ni se puede hacer clic en él, espera un minuto y actualiza la página.

Abre otra pestaña y sube cualquier archivo al bucket de Cloud Storage que creaste anteriormente y que especificaste como activador de tu Cloud Function. Puedes hacerlo a través de la consola o con un comando de gsutil.

Regresa a la pestaña de la IU de Airflow y haz clic en Graph View.

Haz clic en la tarea print_gcs_info, que debe estar delineada en verde.

Haz clic en "Ver registro". en la parte superior derecha del menú

En los registros, verás información sobre el archivo que subiste al bucket de Cloud Storage.

¡Felicitaciones! Acabas de activar un DAG de Airflow con Node.js y Google Cloud Functions.

7. Limpieza

Sigue estos pasos para evitar que se apliquen cargos a tu cuenta de GCP por los recursos que se usan en esta guía de inicio rápido:

  1. Si quieres guardar tus datos, descarga los datos del bucket de Cloud Storage para el entorno de Cloud Composer y del bucket de almacenamiento que creaste para esta guía de inicio rápido (opcional).
  2. Borra el bucket de Cloud Storage para el entorno y que creaste
  3. Borra el entorno de Cloud Composer. Ten en cuenta que borrar tu entorno no borra su bucket de almacenamiento.
  4. (Opcional) Con la computación sin servidores, los primeros 2 millones de invocaciones por mes son gratuitos y, cuando escalas tu función a cero, no se te cobra (consulta los precios para obtener más información). Sin embargo, si deseas borrar tu Cloud Function, haz clic en “BORRAR” para hacerlo. en la parte superior derecha de la página de resumen de la función

4fe11e1b41b32ba2.png

De manera opcional, también puedes borrar el proyecto:

  1. En GCP Console, ve a la página Proyectos.
  2. En la lista de proyectos, selecciona el que quieres borrar y haz clic en Borrar.
  3. En el cuadro, escribe el ID del proyecto y, luego, haz clic en Cerrar para borrarlo.