Déclencher un DAG avec Node.JS et Google Cloud Functions

1. Introduction

Apache Airflow est conçu pour exécuter des DAG de façon régulière, mais vous pouvez également en déclencher en réponse à des événements, tels qu'une modification dans un bucket Cloud Storage ou un message envoyé à Cloud Pub/Sub. Pour ce faire, les DAG Cloud Composer peuvent être déclenchés par Cloud Functions.

L'exemple de cet atelier exécute un DAG simple chaque fois qu'une modification se produit dans un bucket Cloud Storage. Ce DAG utilise BashOperator pour exécuter une commande bash qui affiche les informations de modification concernant les éléments importés dans le bucket Cloud Storage.

Avant de commencer cet atelier, il est recommandé de suivre les ateliers de programmation Présentation de Cloud Composer et Premiers pas avec Cloud Functions. Si vous créez un environnement Composer dans l'atelier de programmation "Présentation de Cloud Composer", vous pouvez l'utiliser dans cet atelier.

Objectifs de l'atelier

Dans cet atelier de programmation, vous allez:

  1. Importez un fichier dans Google Cloud Storage.
  2. Déclencher une fonction Google Cloud à l'aide de l'environnement d'exécution Node.JS
  3. Cette fonction exécutera un DAG dans Google Cloud Composer
  4. Cela exécute une commande bash simple qui affiche la modification dans le bucket Google Cloud Storage.

1d3d3736624a923f.png

Ce que vous allez apprendre

  • Déclencher un DAG Apache Airflow à l'aide de Google Cloud Functions et de Node.js

Ce dont vous avez besoin

  • Compte GCP
  • Connaissances de base en JavaScript
  • Connaissances de base sur Cloud Composer/Airflow et Cloud Functions
  • Maîtriser l'utilisation des commandes CLI

2. Configurer GCP

Sélectionner ou créer le projet

Sélectionnez ou créez un projet Google Cloud Platform. Si vous créez un projet, suivez cette procédure.

Notez l'ID de votre projet, car vous en aurez besoin lors des prochaines étapes.

Si vous créez un projet, son ID se trouve juste en dessous du nom du projet sur la page de création.

Si vous avez déjà créé un projet, vous trouverez son ID sur la page d'accueil de la console, dans sa fiche d'informations

Activer les API

Activez Cloud Composer, Google Cloud Functions, Cloud Identity et l'API IAM (Google Identity and Access Management).

Créer un environnement Composer

Créez un environnement Cloud Composer avec la configuration suivante:

  • Nom: my-composer-environment
  • Emplacement: quel que soit l'emplacement géographiquement le plus proche de vous
  • Zone: toute zone de cette région

Vous pouvez conserver les valeurs par défaut de toutes les autres configurations. Cliquez sur "Créer". au bas de l'écran. Notez le nom et l'emplacement de votre environnement Composer. Vous en aurez besoin dans les prochaines étapes.

Créer un bucket Cloud Storage

Dans votre projet, créez un bucket Cloud Storage avec la configuration suivante:

  • Nom : <id-de-votre-projet>
  • Classe de stockage par défaut: multirégional
  • Emplacement: quel que soit l'emplacement, qui est géographiquement le plus proche de la région Cloud Composer que vous utilisez
  • Modèle de contrôle d'accès: définir des autorisations au niveau de l'objet et du bucket

Appuyez sur "Créer". Lorsque vous êtes prêt, veillez à noter le nom de votre bucket Cloud Storage pour les étapes suivantes.

3. Configurer Google Cloud Functions (GCF)

Pour configurer GCF, nous allons exécuter des commandes dans Google Cloud Shell.

Google Cloud peut être utilisé à distance depuis votre ordinateur portable à l'aide de l'outil de ligne de commande gcloud. Toutefois, dans cet atelier de programmation, nous allons utiliser Google Cloud Shell, un environnement de ligne de commande exécuté dans le cloud.

Cette machine virtuelle basée sur Debian contient tous les outils de développement dont vous aurez besoin. Elle intègre un répertoire d'accueil persistant de 5 Go et s'exécute sur Google Cloud, ce qui améliore nettement les performances du réseau et l'authentification. Cela signifie que tout ce dont vous avez besoin pour cet atelier de programmation est un navigateur (oui, tout fonctionne sur un Chromebook).

Pour activer Google Cloud Shell, cliquez sur le bouton en haut à droite dans la Play Console (le provisionnement et la connexion à l'environnement ne devraient pas prendre plus de quelques minutes):

Accorder des autorisations de signature d'objet blob au compte de service Cloud Functions

Pour que GCF s'authentifie auprès de Cloud IAP, le proxy qui protège le serveur Web Airflow, vous devez attribuer le rôle Service Account Token Creator au GCF du compte de service Appspot. Pour ce faire, exécutez la commande suivante dans Cloud Shell, en remplaçant <your-project-id> par le nom de votre projet.

gcloud iam service-accounts add-iam-policy-binding \
<your-project-id>@appspot.gserviceaccount.com \
--member=serviceAccount:<your-project-id>@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

Par exemple, si votre projet s'appelle my-project, votre commande est la suivante :

gcloud iam service-accounts add-iam-policy-binding \
my-project@appspot.gserviceaccount.com \
--member=serviceAccount:my-project@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

Obtenir l'ID client

Pour créer un jeton permettant de s'authentifier auprès de Cloud IAP, la fonction requiert l'ID client du proxy qui protège le serveur Web Airflow. L'API Cloud Composer ne fournit pas ces informations directement. Envoyez plutôt une requête non authentifiée au serveur Web Airflow et capturez l'ID client à partir de l'URL de redirection. Pour ce faire, nous allons exécuter un fichier Python à l'aide de Cloud Shell pour capturer l'ID client.

Téléchargez le code nécessaire depuis GitHub en exécutant la commande suivante dans Cloud Shell

cd
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git

Si vous recevez un message d'erreur parce que ce répertoire existe déjà, mettez-le à jour vers la dernière version en exécutant la commande suivante :

cd python-docs-samples/
git pull origin master

Accédez au répertoire approprié en exécutant

cd python-docs-samples/composer/rest

Exécutez le code Python pour obtenir votre ID client. Remplacez <your-project-id> par le nom de votre projet, <your-composer-location> par l'emplacement de l'environnement Composer que vous avez créé précédemment et <your-composer-environment> par le nom de l'environnement Composer que vous avez créé précédemment.

python3 get_client_id.py <your-project-id> <your-composer-location> <your-composer-environment>

Par exemple, si le nom de votre projet est my-project, que votre emplacement Composer est us-central1 et que le nom de votre environnement est my-composer, votre commande est

python3 get_client_id.py my-project us-central1 my-composer

La fonction get_client_id.py effectue les opérations suivantes :

  • S'authentifier avec Google Cloud
  • Envoie une requête HTTP non authentifiée au serveur Web Airflow afin d'obtenir l'URI de redirection.
  • Il extrait le paramètre de requête client_id de cette redirection.
  • l'imprime pour que vous puissiez l'utiliser ;

Votre ID client est imprimé sur la ligne de commande et ressemble à ceci:

12345678987654321-abc1def3ghi5jkl7mno8pqr0.apps.googleusercontent.com

4. Créer votre fonction

Dans Cloud Shell, clonez le dépôt avec l'exemple de code nécessaire en exécutant

cd
git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git

Accédez au répertoire nécessaire et laissez Cloud Shell ouvert pendant que vous effectuez les étapes suivantes.

cd nodejs-docs-samples/composer/functions/composer-storage-trigger

Accédez à la page Google Cloud Functions en cliquant sur le menu de navigation, puis sur "Cloud Functions".

Cliquez sur CRÉER UNE FONCTION. en haut de la page

Nommez votre fonction "my-function" et laissez la mémoire par défaut, 256 Mo.

Définissez le déclencheur sur "Cloud Storage", laissez le type d'événement sur "Finaliser/Créer", puis accédez au bucket que vous avez créé à l'étape "Créer un bucket Cloud Storage".

Laissez le code source défini sur "Éditeur intégré". et définissez l'environnement d'exécution sur "Node.js 8".

Dans Cloud Shell, exécutez la commande suivante. Les fichiers index.js et package.json s'ouvrent alors dans l'éditeur Cloud Shell.

cloudshell edit index.js package.json

Cliquez sur l'onglet package.json, copiez ce code et collez-le dans la section package.json de l'éditeur intégré Cloud Functions.

Définir la fonction à exécuter pour triggerDag

Cliquez sur l'onglet "index.js", copiez le code, puis collez-le dans la section "index.js" de l'éditeur intégré Cloud Functions.

Remplacez PROJECT_ID par votre ID de projet et CLIENT_ID par l'ID client que vous avez enregistré à l'étape "Obtenir l'ID client". NE cliquez PAS sur "Créer". pour le moment, il reste encore quelques choses à remplir.

Dans Cloud Shell, exécutez la commande suivante en remplaçant <your-environment-name> par le nom de votre environnement Composer et <your-composer-region> par la région où se trouve votre environnement Composer.

gcloud composer environments describe <your-environment-name> --location <your-composer-region>

Par exemple, si votre environnement s'appelle my-composer-environment et se trouve dans us-central1, votre commande est

gcloud composer environments describe my-composer-environment --location us-central1

Le résultat devrait ressembler à ceci :

config:
 airflowUri: https://abc123efghi456k-tp.appspot.com
 dagGcsPrefix: gs://narnia-north1-test-codelab-jklmno-bucket/dags
 gkeCluster: projects/a-project/zones/narnia-north1-b/clusters/narnia-north1-test-codelab-jklmno-gke
 nodeConfig:
   diskSizeGb: 100
   location: projects/a-project/zones/narnia-north1-b
   machineType: projects/a-project/zones/narnia-north1-b/machineTypes/n1-standard-1
   network: projects/a-project/global/networks/default
   oauthScopes:
   - https://www.googleapis.com/auth/cloud-platform
   serviceAccount: 987665432-compute@developer.gserviceaccount.com
 nodeCount: 3
 softwareConfig:
   imageVersion: composer-1.7.0-airflow-1.10.0
   pythonVersion: '2'
createTime: '2019-05-29T09:41:27.919Z'
name: projects/a-project/locations/narnia-north1/environments/my-composer-environment
state: RUNNING
updateTime: '2019-05-29T09:56:29.969Z'
uuid: 123456-7890-9876-543-210123456

Dans ce résultat, recherchez la variable appelée airflowUri. Dans votre code index.js, remplacez WEBSERVER_ID par l'ID du serveur Web Airflow. Il s'agit de la partie de la variable airflowUri qui comporte "-tp". à la fin, par exemple abc123efghi456k-tp

Cliquez sur le bouton "Plus" puis sélectionnez la région géographiquement la plus proche de vous.

Cochez la case "Réessayer après échec".

Cliquez sur "Créer". pour créer une fonction Cloud

Examiner le code

Le code que vous avez copié depuis index.js se présente comme suit:

// [START composer_trigger]
'use strict';

const fetch = require('node-fetch');
const FormData = require('form-data');

/**
 * Triggered from a message on a Cloud Storage bucket.
 *
 * IAP authorization based on:
 * https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
 * and
 * https://cloud.google.com/iap/docs/authentication-howto
 *
 * @param {!Object} data The Cloud Functions event data.
 * @returns {Promise}
 */
exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

/**
 * @param {string} clientId The client id associated with the Composer webserver application.
 * @param {string} projectId The id for the project containing the Cloud Function.
 * @param {string} userAgent The user agent string which will be provided with the webserver request.
 */
const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

/**
 * @param {string} url The url that the post request targets.
 * @param {string} body The body of the post request.
 * @param {string} idToken Bearer token used to authorize the iap request.
 * @param {string} userAgent The user agent to identify the requester.
 */
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};
// [END composer_trigger]

Voyons ce qui se passe. Trois fonctions sont ici: triggerDag, authorizeIap et makeIapPostRequest.

triggerDag est la fonction qui se déclenche lorsque nous importons un élément dans le bucket Cloud Storage désigné. C'est là que nous configurons les variables importantes utilisées dans les autres requêtes, comme PROJECT_ID, CLIENT_ID, WEBSERVER_ID et DAG_NAME. Elle appelle authorizeIap et makeIapPostRequest.

exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

authorizeIap envoie une requête au proxy qui protège le serveur Web Airflow, à l'aide d'un compte de service et en "échange" Un jeton JWT pour un jeton d'ID qui sera utilisé pour authentifier makeIapPostRequest.

const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

makeIapPostRequest appelle le serveur Web Airflow pour déclencher la composer_sample_trigger_response_dag.. Le nom du DAG est intégré à l'URL du serveur Web Airflow transmise avec le paramètre url, et idToken est le jeton obtenu dans la requête authorizeIap.

const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};

5. Configurer votre DAG

Dans Cloud Shell, accédez au répertoire contenant les exemples de workflows. Il fait partie de l'exemple python-docs-samples téléchargé depuis GitHub à l'étape "Obtenir l'ID client".

cd
cd python-docs-samples/composer/workflows

Importer le DAG dans Composer

Importez l'exemple de DAG dans le bucket de stockage du DAG de votre environnement Composer à l'aide de la commande suivante, où <environment_name> est le nom de votre environnement Composer et <location> le nom de la région dans laquelle il se trouve. trigger_response_dag.py est le DAG sur lequel nous allons travailler.

gcloud composer environments storage dags import \
    --environment <environment_name> \
    --location <location> \
    --source trigger_response_dag.py

Par exemple, si votre environnement Composer a été nommé my-composer et se trouve dans us-central1, votre commande est la suivante :

gcloud composer environments storage dags import \
    --environment my-composer \
    --location us-central1 \
    --source trigger_response_dag.py

Exploration du DAG

Voici à quoi ressemble le code du DAG dans trigger_response.py

import datetime
import airflow
from airflow.operators import bash_operator


default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

La section default_args contient les arguments par défaut requis par le modèle BaseOperator dans Apache Airflow. Cette section s'affiche avec ces paramètres dans n'importe quel DAG Apache Airflow. owner est actuellement défini sur Composer Example, mais vous pouvez le remplacer par votre nom si vous le souhaitez. depends_on_past indique que ce DAG ne dépend d'aucun DAG antérieur. Les trois sections de l'e-mail, email, email_on_failure et email_on_retry, sont définies de sorte qu'aucune notification par e-mail ne soit envoyée en fonction de l'état de ce DAG. Le DAG ne relancera qu'une seule tentative, car retries est défini sur 1, et le fera après cinq minutes, par retry_delay. Le start_date indique normalement quand un DAG doit s'exécuter, conjointement avec son schedule_interval (défini ultérieurement), mais n'est pas pertinent dans le cas de ce DAG. Il est défini pour le 1er janvier 2017, mais peut être défini sur n'importe quelle date passée.

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

La section with airflow.DAG configure le DAG qui sera exécuté. Elle sera exécutée avec l'ID de tâche composer_sample_trigger_response_dag, les arguments par défaut de la section default_args et, surtout, avec un schedule_interval défini sur None. schedule_interval est défini sur None, car nous déclenchons ce DAG spécifique avec notre fonction Cloud. C'est pourquoi le start_date dans default_args n'est pas pertinent.

Lors de son exécution, le DAG imprime sa configuration, comme indiqué dans la variable print_gcs_info.

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

6. Tester votre fonction

Ouvrez votre environnement Composer et, dans la ligne contenant le nom de votre environnement, cliquez sur le lien Airflow.

Ouvrez composer_sample_trigger_response_dag en cliquant sur son nom. Pour le moment, il n'y aura aucune preuve d'exécution du DAG, car nous n'avons pas encore déclenché son exécution.Si ce DAG n'est pas visible ou cliquable, attendez une minute et actualisez la page.

Ouvrez un onglet distinct et importez n'importe quel fichier dans le bucket Cloud Storage que vous avez créé précédemment et que vous avez spécifié comme déclencheur de votre fonction Cloud. Pour ce faire, accédez à la console ou utilisez une commande gsutil.

Revenez à l'onglet contenant votre interface utilisateur Airflow, puis cliquez sur Graph View (Vue graphique).

Cliquez sur la tâche print_gcs_info, qui devrait être en vert.

Cliquez sur "Afficher le journal". en haut à droite du menu

Les journaux contiennent des informations sur le fichier que vous avez importé dans votre bucket Cloud Storage.

Félicitations ! Vous venez de déclencher un DAG Airflow à l'aide de Node.js et de Google Cloud Functions.

7. Nettoyage

Pour éviter que les ressources utilisées dans ce guide de démarrage rapide soient facturées sur votre compte GCP, procédez comme suit:

  1. (Facultatif) Pour enregistrer vos données, téléchargez les données depuis le bucket Cloud Storage pour l'environnement Cloud Composer et le bucket de stockage que vous avez créé pour ce guide de démarrage rapide.
  2. Supprimez le bucket Cloud Storage correspondant à l'environnement et que vous avez créé.
  3. Supprimez l'environnement Cloud Composer. Notez que la suppression de votre environnement n'entraîne pas la suppression du bucket de stockage pour l'environnement.
  4. (Facultatif) Avec l'informatique sans serveur, les deux premiers millions d'appels par mois sont sans frais. Lorsque vous faites passer votre fonction à zéro, aucuns frais ne vous sont facturés (consultez les tarifs pour en savoir plus). Toutefois, si vous souhaitez supprimer votre fonction Cloud, cliquez sur "SUPPRIMER". en haut à droite de la page de présentation de votre fonction

4fe11e1b41b32ba2.png

Vous pouvez également supprimer le projet (facultatif) :

  1. Dans la console GCP, accédez à la page Projets.
  2. Dans la liste des projets, sélectionnez celui que vous souhaitez supprimer, puis cliquez sur Supprimer.
  3. Dans la zone prévue à cet effet, saisissez l'ID du projet, puis cliquez sur Arrêter pour supprimer le projet.