DAG mit Node.JS und Google Cloud Functions auslösen

1. Einführung

Apache Airflow ist so konzipiert, dass DAGs nach einem regelmäßigen Zeitplan ausgeführt werden. Sie können DAGs jedoch auch als Reaktion auf Ereignisse auslösen, z. B. bei einer Änderung in einem Cloud Storage-Bucket oder einer Nachricht, die an Cloud Pub/Sub gesendet wird. Zu diesem Zweck können Cloud Composer-DAGs über Cloud Functions ausgelöst werden.

Im Beispiel in diesem Lab wird bei jeder Änderung in einem Cloud Storage-Bucket ein einfacher DAG ausgeführt. Dieser DAG verwendet den BashOperator, um einen Bash-Befehl auszuführen, der die Änderungsinformationen zu den in den Cloud Storage-Bucket hochgeladenen Daten ausgibt.

Bevor Sie mit diesem Lab beginnen, sollten Sie die Codelabs Einführung in Cloud Composer und Erste Schritte mit Cloud Functions abgeschlossen haben. Wenn Sie im Codelab „Einführung in Cloud Composer“ eine Composer-Umgebung erstellen, können Sie diese Umgebung in diesem Lab verwenden.

Inhalt

Aufgaben in diesem Codelab:

  1. Laden Sie eine Datei in Google Cloud Storage hoch,
  2. Google Cloud Functions-Funktion mithilfe der Node.JS-Laufzeit auslösen
  3. Diese Funktion führt einen DAG in Google Cloud Composer aus.
  4. Er führt einen einfachen Bash-Befehl aus, der die Änderung im Google Cloud Storage-Bucket ausgibt.

1d3d3736624a923f.png

Lerninhalte

  • Apache Airflow-DAG mit Google Cloud Functions und Node.js auslösen

Voraussetzungen

  • GCP-Konto
  • Grundlegendes Verständnis von JavaScript
  • Grundkenntnisse in Cloud Composer/Airflow und Cloud Functions
  • Einfache Verwendung von CLI-Befehlen

2. GCP einrichten

Projekt auswählen oder erstellen

Wählen Sie ein Google Cloud Platform-Projekt aus oder erstellen Sie eines. Wenn Sie ein neues Projekt erstellen, folgen Sie der Anleitung hier.

Notieren Sie sich Ihre Projekt-ID. Sie benötigen sie in späteren Schritten.

Wenn Sie ein neues Projekt erstellen, finden Sie die Projekt-ID direkt unter dem Projektnamen auf der Erstellungsseite.

Wenn Sie bereits ein Projekt erstellt haben, finden Sie die ID auf der Startseite der Console in der Projektinfokarte.

APIs aktivieren

Aktivieren Sie die Cloud Composer, Google Cloud Functions, Cloud Identity und die Google Identity and Access Management (IAM) API.

Composer-Umgebung erstellen

Erstellen Sie eine Cloud Composer-Umgebung mit der folgenden Konfiguration:

  • Name: my-composer-environment
  • Standort: der Standort, der Ihnen am nächsten liegt
  • Zone: jede Zone in dieser Region

Für alle anderen Konfigurationen können die Standardeinstellungen beibehalten werden. Klicken Sie auf „Erstellen“. Notieren Sie sich den Namen und den Standort der Composer-Umgebung, da Sie diese in den nächsten Schritten benötigen.

Cloud Storage-Bucket erstellen

Erstellen Sie in Ihrem Projekt einen Cloud Storage-Bucket mit der folgenden Konfiguration:

  • Name: <your-project-id>
  • Standardspeicherklasse: Multiregional
  • Standort: Der Standort, der der verwendeten Cloud Composer-Region am nächsten liegt
  • Zugriffssteuerungsmodell: Berechtigungen auf Objekt- und Bucket-Ebene festlegen

Klicken Sie auf „Erstellen“. Notieren Sie sich für spätere Schritte den Namen Ihres Cloud Storage-Buckets.

3. Google Cloud Functions (GCF) einrichten

Zur Einrichtung von GCF werden Befehle in Google Cloud Shell ausgeführt.

Sie können Google Cloud mit dem gcloud-Befehlszeilentool von Ihrem Laptop aus aus der Ferne bedienen. In diesem Codelab verwenden wir jedoch Google Cloud Shell, eine Befehlszeilenumgebung, die in der Cloud ausgeführt wird.

Diese Debian-basierte virtuelle Maschine verfügt über alle erforderlichen Entwicklungstools. Sie bietet ein Basisverzeichnis mit 5 GB nichtflüchtigem Speicher und läuft auf der Google Cloud, wodurch Netzwerkleistung und Authentifizierung deutlich verbessert werden. Für dieses Codelab benötigen Sie also nur einen Browser – ja, er funktioniert auf Chromebooks.

Klicken Sie zur Aktivierung von Google Cloud Shell in der Entwicklerkonsole auf die Schaltfläche oben rechts (es dauert nur einen Moment, bis die Umgebung bereitgestellt und eine Verbindung hergestellt werden kann):

Cloud Functions-Dienstkonto Berechtigungen zur Blobsignatur gewähren

Damit sich GCF bei Cloud IAP, dem Proxy, der den Airflow-Webserver schützt, authentifizieren kann, müssen Sie dem GCF-Dienstkonto des Appspot-Dienstkontos die Rolle Service Account Token Creator zuweisen. Führen Sie dazu den folgenden Befehl in Cloud Shell aus und ersetzen Sie <your-project-id> durch den Namen Ihres Projekts.

gcloud iam service-accounts add-iam-policy-binding \
<your-project-id>@appspot.gserviceaccount.com \
--member=serviceAccount:<your-project-id>@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

Wenn Ihr Projekt beispielsweise my-project heißt, lautet Ihr Befehl

gcloud iam service-accounts add-iam-policy-binding \
my-project@appspot.gserviceaccount.com \
--member=serviceAccount:my-project@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

Client-ID abrufen

Zum Erstellen eines Tokens für die Authentifizierung bei Cloud IAP benötigt die Funktion die Client-ID des Proxys, der den Airflow-Webserver schützt. Die Cloud Composer API stellt diese Informationen nicht direkt zur Verfügung. Stellen Sie stattdessen eine nicht authentifizierte Anfrage an den Airflow-Webserver und erfassen Sie die Client-ID aus der Weiterleitungs-URL. Dazu führen wir mit Cloud Shell eine Python-Datei aus, um die Client-ID zu erfassen.

Laden Sie den erforderlichen Code von GitHub herunter, indem Sie den folgenden Befehl in Cloud Shell ausführen.

cd
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git

Wenn eine Fehlermeldung angezeigt wird, weil dieses Verzeichnis bereits vorhanden ist, aktualisieren Sie es auf die neueste Version, indem Sie den folgenden Befehl ausführen

cd python-docs-samples/
git pull origin master

Wechseln Sie mit dem folgenden Befehl in das entsprechende Verzeichnis:

cd python-docs-samples/composer/rest

Führen Sie den Python-Code aus, um Ihre Client-ID abzurufen. Ersetzen Sie dabei <your-project-id> durch den Namen Ihres Projekts, <your-composer-location> durch den Speicherort der zuvor erstellten Composer-Umgebung und <your-composer-environment> durch den Namen der zuvor erstellten Composer-Umgebung.

python3 get_client_id.py <your-project-id> <your-composer-location> <your-composer-environment>

Wenn der Projektname beispielsweise my-project lautet, der Composer-Speicherort us-central1 und der Name der Umgebung my-composer lautet, lautet der Befehl wie folgt:

python3 get_client_id.py my-project us-central1 my-composer

get_client_id.py führt Folgendes aus:

  • Authentifizierung bei Google Cloud
  • Sendet eine nicht authentifizierte HTTP-Anfrage an den Airflow-Webserver, um den Weiterleitungs-URI abzurufen
  • Extrahiert den Abfrageparameter client_id aus dieser Weiterleitung
  • Druckt sie aus, damit du sie verwenden kannst

Ihre Client-ID wird in der Befehlszeile ausgegeben und sieht in etwa so aus:

12345678987654321-abc1def3ghi5jkl7mno8pqr0.apps.googleusercontent.com

4. Funktion erstellen

Klonen Sie in Cloud Shell das Repository mit dem erforderlichen Beispielcode. Führen Sie dazu folgenden Befehl aus:

cd
git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git

Wechseln Sie zum erforderlichen Verzeichnis und lassen Sie Cloud Shell für die nächsten Schritte geöffnet.

cd nodejs-docs-samples/composer/functions/composer-storage-trigger

Rufen Sie die Seite „Google Cloud Functions“ auf, indem Sie auf das Navigationsmenü und dann auf „Cloud Functions“ klicken.

Klicken Sie auf FUNKTION ERSTELLEN. oben auf der Seite

Nennen Sie die Funktion „my-function“. und belassen Sie den Standardwert von 256 MB.

Legen Sie den Trigger auf „Cloud Storage“ fest, behalten Sie den Ereignistyp „Abschließen/Erstellen“ bei und suchen Sie den Bucket, den Sie im Schritt „Cloud Storage-Bucket erstellen“ erstellt haben.

Belassen Sie den Quellcode auf "Inline Editor" (Inline-Editor). und legen Sie die Laufzeit auf "Node.js 8" fest.

Führen Sie in Cloud Shell den folgenden Befehl aus. Dadurch werden „index.js“ und „package.json“ im Cloud Shell-Editor geöffnet.

cloudshell edit index.js package.json

Klicken Sie auf den Tab „package.json“, kopieren Sie den Code und fügen Sie ihn im Inline-Editor von Cloud Functions in den Abschnitt „package.json“ ein.

Legen Sie unter „Auszuführende Funktion“ fest, auslösenDag

Klicken Sie auf den Tab „index.js“, kopieren Sie den Code und fügen Sie ihn im Inline-Editor von Cloud Functions in den Abschnitt „index.js“ ein.

Ändern Sie PROJECT_ID in Ihre Projekt-ID und CLIENT_ID in die Client-ID, die Sie im Schritt „Client-ID abrufen“ gespeichert haben. Klicken Sie NICHT auf „Erstellen“. Trotzdem müssen wir noch ein paar Angaben machen.

Führen Sie in Cloud Shell den folgenden Befehl aus und ersetzen Sie dabei <your-environment-name>. durch den Namen Ihrer Composer-Umgebung und <your-composer-region> durch die Region, in der sich die Composer-Umgebung befindet.

gcloud composer environments describe <your-environment-name> --location <your-composer-region>

Wenn Ihre Umgebung beispielsweise den Namen my-composer-environment hat und sich in us-central1 befindet, lautet der Befehl

gcloud composer environments describe my-composer-environment --location us-central1

Die Ausgabe sollte in etwa so aussehen:

config:
 airflowUri: https://abc123efghi456k-tp.appspot.com
 dagGcsPrefix: gs://narnia-north1-test-codelab-jklmno-bucket/dags
 gkeCluster: projects/a-project/zones/narnia-north1-b/clusters/narnia-north1-test-codelab-jklmno-gke
 nodeConfig:
   diskSizeGb: 100
   location: projects/a-project/zones/narnia-north1-b
   machineType: projects/a-project/zones/narnia-north1-b/machineTypes/n1-standard-1
   network: projects/a-project/global/networks/default
   oauthScopes:
   - https://www.googleapis.com/auth/cloud-platform
   serviceAccount: 987665432-compute@developer.gserviceaccount.com
 nodeCount: 3
 softwareConfig:
   imageVersion: composer-1.7.0-airflow-1.10.0
   pythonVersion: '2'
createTime: '2019-05-29T09:41:27.919Z'
name: projects/a-project/locations/narnia-north1/environments/my-composer-environment
state: RUNNING
updateTime: '2019-05-29T09:56:29.969Z'
uuid: 123456-7890-9876-543-210123456

Suchen Sie in dieser Ausgabe nach der Variablen namens airflowUri. Ändern Sie die WEBSERVER_ID in Ihrem index.js-Code in die Airflow-Webserver-ID. Er ist der Teil der airflowUri-Variable, der das Präfix „-tp“ enthält. am Ende, z. B. abc123efghi456k-tp

Klicken Sie auf das Dreipunkt-Menü Dropdown-Link und wählen Sie dann die Region aus, die geografisch am nächsten liegt.

Klicken Sie das Kästchen „Bei Fehler noch einmal versuchen“ an.

Klicken Sie auf „Erstellen“. zum Erstellen Ihrer Cloud Functions-Funktion

Den Code durchgehen

Der Code, den Sie aus „index.js“ kopiert haben, sieht in etwa so aus:

// [START composer_trigger]
'use strict';

const fetch = require('node-fetch');
const FormData = require('form-data');

/**
 * Triggered from a message on a Cloud Storage bucket.
 *
 * IAP authorization based on:
 * https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
 * and
 * https://cloud.google.com/iap/docs/authentication-howto
 *
 * @param {!Object} data The Cloud Functions event data.
 * @returns {Promise}
 */
exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

/**
 * @param {string} clientId The client id associated with the Composer webserver application.
 * @param {string} projectId The id for the project containing the Cloud Function.
 * @param {string} userAgent The user agent string which will be provided with the webserver request.
 */
const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

/**
 * @param {string} url The url that the post request targets.
 * @param {string} body The body of the post request.
 * @param {string} idToken Bearer token used to authorize the iap request.
 * @param {string} userAgent The user agent to identify the requester.
 */
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};
// [END composer_trigger]

Sehen wir uns an, was los ist. Hier gibt es drei Funktionen: triggerDag, authorizeIap und makeIapPostRequest.

triggerDag ist die Funktion, die ausgelöst wird, wenn etwas in den festgelegten Cloud Storage-Bucket hochgeladen wird. Hier konfigurieren wir wichtige Variablen, die in den anderen Anfragen verwendet werden, z. B. PROJECT_ID, CLIENT_ID, WEBSERVER_ID und DAG_NAME. Sie ruft authorizeIap und makeIapPostRequest auf.

exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

authorizeIap stellt mithilfe eines Dienstkontos und „Austausch“ eine Anfrage an den Proxy, der den Airflow-Webserver schützt. Ein JWT für ein ID-Token, das zur Authentifizierung von makeIapPostRequest verwendet wird.

const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

makeIapPostRequest ruft den Airflow-Webserver auf, um composer_sample_trigger_response_dag. auszulösen. Der DAG-Name ist in die URL des Airflow-Webservers eingebettet, die mit dem url-Parameter übergeben wird, und idToken ist das Token, das wir in der authorizeIap-Anfrage abgerufen haben.

const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};

5. DAG einrichten

Wechseln Sie in Cloud Shell zum Verzeichnis mit den Beispielworkflows. Es ist Teil der Python-docs-Beispiele, die Sie im Schritt "Client-ID abrufen" von GitHub heruntergeladen haben.

cd
cd python-docs-samples/composer/workflows

DAG in Composer hochladen

Laden Sie den Beispiel-DAG mit dem folgenden Befehl in den DAG-Speicher-Bucket Ihrer Composer-Umgebung hoch. Dabei ist <environment_name> der Name der Composer-Umgebung und <location> der Name der Region, in der sie sich befindet. trigger_response_dag.py ist der DAG, mit dem wir zusammenarbeiten.

gcloud composer environments storage dags import \
    --environment <environment_name> \
    --location <location> \
    --source trigger_response_dag.py

Wenn Ihre Composer-Umgebung beispielsweise den Namen my-composer hatte und sich in us-central1 befindet, lautet der Befehl

gcloud composer environments storage dags import \
    --environment my-composer \
    --location us-central1 \
    --source trigger_response_dag.py

DAG durchgehen

Der DAG-Code in trigger_response.py sieht so aus:

import datetime
import airflow
from airflow.operators import bash_operator


default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

Der Abschnitt default_args enthält die Standardargumente, die für das BaseOperator-Modell in Apache Airflow erforderlich sind. Dieser Abschnitt würde mit diesen Parametern in jedem Apache Airflow-DAG angezeigt werden. Für owner ist momentan Composer Example festgelegt, aber du kannst diesen Namen in deinen Namen ändern. depends_on_past zeigt, dass dieser DAG nicht von vorherigen DAGs abhängig ist. Die drei E-Mail-Bereiche email, email_on_failure und email_on_retry sind so festgelegt, dass keine E-Mail-Benachrichtigungen basierend auf dem Status dieses DAG eingehen. Der DAG wiederholt den Vorgang nur einmal, da retries auf 1 gesetzt ist, und zwar nach fünf Minuten pro retry_delay. start_date gibt normalerweise vor, wann ein DAG in Verbindung mit seinem (später festgelegten) schedule_interval ausgeführt werden soll. Im Fall dieses DAG ist dies jedoch nicht relevant. Als Datum ist der 1. Januar 2017 festgelegt, es kann aber auch ein Datum in der Vergangenheit liegen.

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

Im Bereich with airflow.DAG wird der auszuführende DAG konfiguriert. Er wird mit der Aufgaben-ID composer_sample_trigger_response_dag, den Standardargumenten aus dem Abschnitt default_args und, vor allem mit dem schedule_interval None, ausgeführt. schedule_interval ist auf None gesetzt, da wir diesen bestimmten DAG mit unserer Cloud Functions-Funktion auslösen. Aus diesem Grund ist die start_date auf default_args nicht relevant.

Bei der Ausführung gibt der DAG seine Konfiguration aus, die in der Variablen print_gcs_info festgelegt ist.

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

6. Funktion testen

Öffnen Sie die Composer-Umgebung und klicken Sie in der Zeile mit dem Umgebungsnamen auf den Airflow-Link.

Öffnen Sie die composer_sample_trigger_response_dag, indem Sie auf den Namen klicken. Derzeit gibt es keine Hinweise auf DAG-Ausführungen, da wir die Ausführung des DAG noch nicht ausgelöst haben.Wenn dieser DAG nicht sichtbar oder anklickbar ist, warten Sie eine Minute und aktualisieren Sie die Seite.

Öffnen Sie einen separaten Tab und laden Sie eine Datei in den Cloud Storage-Bucket hoch, den Sie zuvor erstellt und als Trigger für Ihre Cloud Functions-Funktion angegeben haben. Dazu können Sie die Console oder den gsutil-Befehl verwenden.

Gehen Sie zurück zum Tab mit der Airflow-Benutzeroberfläche und klicken Sie auf „Graph View“ (Diagrammansicht).

Klicken Sie auf die Aufgabe print_gcs_info, die grün markiert sein sollte.

Klicken Sie auf „Protokoll anzeigen“. oben rechts im Menü

Die Logs enthalten Informationen zu der Datei, die Sie in Ihren Cloud Storage-Bucket hochgeladen haben.

Glückwunsch! Sie haben soeben einen Airflow-DAG mit Node.js und Google Cloud Functions ausgelöst.

7. Bereinigen

So vermeiden Sie, dass Ihrem GCP-Konto die in dieser Kurzanleitung verwendeten Ressourcen in Rechnung gestellt werden:

  1. Optional: Laden Sie die Daten aus dem Cloud Storage-Bucket für die Cloud Composer-Umgebung und aus dem Storage-Bucket herunter, den Sie für diese Kurzanleitung erstellt haben, um Ihre Daten zu speichern.
  2. Löschen Sie den Cloud Storage-Bucket für die Umgebung und den von Ihnen erstellten Bucket.
  3. Löschen Sie die Cloud Composer-Umgebung. Der Storage-Bucket für die Umgebung wird durch das Löschen der Umgebung nicht gelöscht.
  4. (Optional) Beim serverlosen Computing sind die ersten 2 Millionen Aufrufe pro Monat kostenlos. Wenn Sie die Funktion auf null skalieren, fallen keine Kosten an. Weitere Informationen finden Sie unter Preise. Wenn Sie Ihre Cloud Functions-Funktion jedoch löschen möchten, klicken Sie auf „LÖSCHEN“. oben rechts auf der Übersichtsseite der Funktion

4fe11e1b41b32ba2.png

Optional können Sie das Projekt auch löschen:

  1. Rufen Sie in der GCP Console die Seite Projekte auf.
  2. Wählen Sie in der Projektliste das Projekt aus, das Sie löschen möchten, und klicken Sie auf Löschen.
  3. Geben Sie die Projekt-ID in das Feld ein und klicken Sie auf Beenden, um das Projekt zu löschen.