1. Einführung
Apache Airflow ist so konzipiert, dass DAGs nach einem regelmäßigen Zeitplan ausgeführt werden. Sie können DAGs jedoch auch als Reaktion auf Ereignisse auslösen, z. B. bei einer Änderung in einem Cloud Storage-Bucket oder einer Nachricht, die an Cloud Pub/Sub gesendet wird. Zu diesem Zweck können Cloud Composer-DAGs über Cloud Functions ausgelöst werden.
Im Beispiel in diesem Lab wird bei jeder Änderung in einem Cloud Storage-Bucket ein einfacher DAG ausgeführt. Dieser DAG verwendet den BashOperator, um einen Bash-Befehl auszuführen, der die Änderungsinformationen zu den in den Cloud Storage-Bucket hochgeladenen Daten ausgibt.
Bevor Sie mit diesem Lab beginnen, sollten Sie die Codelabs Einführung in Cloud Composer und Erste Schritte mit Cloud Functions abgeschlossen haben. Wenn Sie im Codelab „Einführung in Cloud Composer“ eine Composer-Umgebung erstellen, können Sie diese Umgebung in diesem Lab verwenden.
Inhalt
Aufgaben in diesem Codelab:
- Laden Sie eine Datei in Google Cloud Storage hoch,
- Google Cloud Functions-Funktion mithilfe der Node.JS-Laufzeit auslösen
- Diese Funktion führt einen DAG in Google Cloud Composer aus.
- Er führt einen einfachen Bash-Befehl aus, der die Änderung im Google Cloud Storage-Bucket ausgibt.
Lerninhalte
- Apache Airflow-DAG mit Google Cloud Functions und Node.js auslösen
Voraussetzungen
- GCP-Konto
- Grundlegendes Verständnis von JavaScript
- Grundkenntnisse in Cloud Composer/Airflow und Cloud Functions
- Einfache Verwendung von CLI-Befehlen
2. GCP einrichten
Projekt auswählen oder erstellen
Wählen Sie ein Google Cloud Platform-Projekt aus oder erstellen Sie eines. Wenn Sie ein neues Projekt erstellen, folgen Sie der Anleitung hier.
Notieren Sie sich Ihre Projekt-ID. Sie benötigen sie in späteren Schritten.
Wenn Sie ein neues Projekt erstellen, finden Sie die Projekt-ID direkt unter dem Projektnamen auf der Erstellungsseite. | |
Wenn Sie bereits ein Projekt erstellt haben, finden Sie die ID auf der Startseite der Console in der Projektinfokarte. |
APIs aktivieren
Composer-Umgebung erstellen
Erstellen Sie eine Cloud Composer-Umgebung mit der folgenden Konfiguration:
Für alle anderen Konfigurationen können die Standardeinstellungen beibehalten werden. Klicken Sie auf „Erstellen“. Notieren Sie sich den Namen und den Standort der Composer-Umgebung, da Sie diese in den nächsten Schritten benötigen. |
Cloud Storage-Bucket erstellen
Erstellen Sie in Ihrem Projekt einen Cloud Storage-Bucket mit der folgenden Konfiguration:
Klicken Sie auf „Erstellen“. Notieren Sie sich für spätere Schritte den Namen Ihres Cloud Storage-Buckets. |
3. Google Cloud Functions (GCF) einrichten
Zur Einrichtung von GCF werden Befehle in Google Cloud Shell ausgeführt.
Sie können Google Cloud mit dem gcloud-Befehlszeilentool von Ihrem Laptop aus aus der Ferne bedienen. In diesem Codelab verwenden wir jedoch Google Cloud Shell, eine Befehlszeilenumgebung, die in der Cloud ausgeführt wird.
Diese Debian-basierte virtuelle Maschine verfügt über alle erforderlichen Entwicklungstools. Sie bietet ein Basisverzeichnis mit 5 GB nichtflüchtigem Speicher und läuft auf der Google Cloud, wodurch Netzwerkleistung und Authentifizierung deutlich verbessert werden. Für dieses Codelab benötigen Sie also nur einen Browser – ja, er funktioniert auf Chromebooks.
Klicken Sie zur Aktivierung von Google Cloud Shell in der Entwicklerkonsole auf die Schaltfläche oben rechts (es dauert nur einen Moment, bis die Umgebung bereitgestellt und eine Verbindung hergestellt werden kann): |
Cloud Functions-Dienstkonto Berechtigungen zur Blobsignatur gewähren
Damit sich GCF bei Cloud IAP, dem Proxy, der den Airflow-Webserver schützt, authentifizieren kann, müssen Sie dem GCF-Dienstkonto des Appspot-Dienstkontos die Rolle Service Account Token Creator
zuweisen. Führen Sie dazu den folgenden Befehl in Cloud Shell aus und ersetzen Sie <your-project-id>
durch den Namen Ihres Projekts.
gcloud iam service-accounts add-iam-policy-binding \ <your-project-id>@appspot.gserviceaccount.com \ --member=serviceAccount:<your-project-id>@appspot.gserviceaccount.com \ --role=roles/iam.serviceAccountTokenCreator
Wenn Ihr Projekt beispielsweise my-project
heißt, lautet Ihr Befehl
gcloud iam service-accounts add-iam-policy-binding \ my-project@appspot.gserviceaccount.com \ --member=serviceAccount:my-project@appspot.gserviceaccount.com \ --role=roles/iam.serviceAccountTokenCreator
Client-ID abrufen
Zum Erstellen eines Tokens für die Authentifizierung bei Cloud IAP benötigt die Funktion die Client-ID des Proxys, der den Airflow-Webserver schützt. Die Cloud Composer API stellt diese Informationen nicht direkt zur Verfügung. Stellen Sie stattdessen eine nicht authentifizierte Anfrage an den Airflow-Webserver und erfassen Sie die Client-ID aus der Weiterleitungs-URL. Dazu führen wir mit Cloud Shell eine Python-Datei aus, um die Client-ID zu erfassen.
Laden Sie den erforderlichen Code von GitHub herunter, indem Sie den folgenden Befehl in Cloud Shell ausführen.
cd git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
Wenn eine Fehlermeldung angezeigt wird, weil dieses Verzeichnis bereits vorhanden ist, aktualisieren Sie es auf die neueste Version, indem Sie den folgenden Befehl ausführen
cd python-docs-samples/ git pull origin master
Wechseln Sie mit dem folgenden Befehl in das entsprechende Verzeichnis:
cd python-docs-samples/composer/rest
Führen Sie den Python-Code aus, um Ihre Client-ID abzurufen. Ersetzen Sie dabei <your-project-id>
durch den Namen Ihres Projekts, <your-composer-location>
durch den Speicherort der zuvor erstellten Composer-Umgebung und <your-composer-environment>
durch den Namen der zuvor erstellten Composer-Umgebung.
python3 get_client_id.py <your-project-id> <your-composer-location> <your-composer-environment>
Wenn der Projektname beispielsweise my-project
lautet, der Composer-Speicherort us-central1
und der Name der Umgebung my-composer
lautet, lautet der Befehl wie folgt:
python3 get_client_id.py my-project us-central1 my-composer
get_client_id.py
führt Folgendes aus:
- Authentifizierung bei Google Cloud
- Sendet eine nicht authentifizierte HTTP-Anfrage an den Airflow-Webserver, um den Weiterleitungs-URI abzurufen
- Extrahiert den Abfrageparameter
client_id
aus dieser Weiterleitung - Druckt sie aus, damit du sie verwenden kannst
Ihre Client-ID wird in der Befehlszeile ausgegeben und sieht in etwa so aus:
12345678987654321-abc1def3ghi5jkl7mno8pqr0.apps.googleusercontent.com
4. Funktion erstellen
Klonen Sie in Cloud Shell das Repository mit dem erforderlichen Beispielcode. Führen Sie dazu folgenden Befehl aus:
cd git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git
Wechseln Sie zum erforderlichen Verzeichnis und lassen Sie Cloud Shell für die nächsten Schritte geöffnet.
cd nodejs-docs-samples/composer/functions/composer-storage-trigger
Rufen Sie die Seite „Google Cloud Functions“ auf, indem Sie auf das Navigationsmenü und dann auf „Cloud Functions“ klicken. | |
Klicken Sie auf FUNKTION ERSTELLEN. oben auf der Seite | |
Nennen Sie die Funktion „my-function“. und belassen Sie den Standardwert von 256 MB. | |
Legen Sie den Trigger auf „Cloud Storage“ fest, behalten Sie den Ereignistyp „Abschließen/Erstellen“ bei und suchen Sie den Bucket, den Sie im Schritt „Cloud Storage-Bucket erstellen“ erstellt haben. | |
Belassen Sie den Quellcode auf "Inline Editor" (Inline-Editor). und legen Sie die Laufzeit auf "Node.js 8" fest. |
Führen Sie in Cloud Shell den folgenden Befehl aus. Dadurch werden „index.js“ und „package.json“ im Cloud Shell-Editor geöffnet.
cloudshell edit index.js package.json
Klicken Sie auf den Tab „package.json“, kopieren Sie den Code und fügen Sie ihn im Inline-Editor von Cloud Functions in den Abschnitt „package.json“ ein. | |
Legen Sie unter „Auszuführende Funktion“ fest, auslösenDag | |
Klicken Sie auf den Tab „index.js“, kopieren Sie den Code und fügen Sie ihn im Inline-Editor von Cloud Functions in den Abschnitt „index.js“ ein. | |
Ändern Sie |
Führen Sie in Cloud Shell den folgenden Befehl aus und ersetzen Sie dabei <your-environment-name>. durch den Namen Ihrer Composer-Umgebung und <your-composer-region> durch die Region, in der sich die Composer-Umgebung befindet.
gcloud composer environments describe <your-environment-name> --location <your-composer-region>
Wenn Ihre Umgebung beispielsweise den Namen my-composer-environment
hat und sich in us-central1
befindet, lautet der Befehl
gcloud composer environments describe my-composer-environment --location us-central1
Die Ausgabe sollte in etwa so aussehen:
config:
airflowUri: https://abc123efghi456k-tp.appspot.com
dagGcsPrefix: gs://narnia-north1-test-codelab-jklmno-bucket/dags
gkeCluster: projects/a-project/zones/narnia-north1-b/clusters/narnia-north1-test-codelab-jklmno-gke
nodeConfig:
diskSizeGb: 100
location: projects/a-project/zones/narnia-north1-b
machineType: projects/a-project/zones/narnia-north1-b/machineTypes/n1-standard-1
network: projects/a-project/global/networks/default
oauthScopes:
- https://www.googleapis.com/auth/cloud-platform
serviceAccount: 987665432-compute@developer.gserviceaccount.com
nodeCount: 3
softwareConfig:
imageVersion: composer-1.7.0-airflow-1.10.0
pythonVersion: '2'
createTime: '2019-05-29T09:41:27.919Z'
name: projects/a-project/locations/narnia-north1/environments/my-composer-environment
state: RUNNING
updateTime: '2019-05-29T09:56:29.969Z'
uuid: 123456-7890-9876-543-210123456
Suchen Sie in dieser Ausgabe nach der Variablen namens | |
Klicken Sie auf das Dreipunkt-Menü Dropdown-Link und wählen Sie dann die Region aus, die geografisch am nächsten liegt. | |
Klicken Sie das Kästchen „Bei Fehler noch einmal versuchen“ an. | |
Klicken Sie auf „Erstellen“. zum Erstellen Ihrer Cloud Functions-Funktion |
Den Code durchgehen
Der Code, den Sie aus „index.js“ kopiert haben, sieht in etwa so aus:
// [START composer_trigger]
'use strict';
const fetch = require('node-fetch');
const FormData = require('form-data');
/**
* Triggered from a message on a Cloud Storage bucket.
*
* IAP authorization based on:
* https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
* and
* https://cloud.google.com/iap/docs/authentication-howto
*
* @param {!Object} data The Cloud Functions event data.
* @returns {Promise}
*/
exports.triggerDag = async data => {
// Fill in your Composer environment information here.
// The project that holds your function
const PROJECT_ID = 'your-project-id';
// Navigate to your webserver's login page and get this from the URL
const CLIENT_ID = 'your-iap-client-id';
// This should be part of your webserver's URL:
// {tenant-project-id}.appspot.com
const WEBSERVER_ID = 'your-tenant-project-id';
// The name of the DAG you wish to trigger
const DAG_NAME = 'composer_sample_trigger_response_dag';
// Other constants
const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
const USER_AGENT = 'gcf-event-trigger';
const BODY = {conf: JSON.stringify(data)};
// Make the request
try {
const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);
return makeIapPostRequest(
WEBSERVER_URL,
BODY,
iap.idToken,
USER_AGENT,
iap.jwt
);
} catch (err) {
throw new Error(err);
}
};
/**
* @param {string} clientId The client id associated with the Composer webserver application.
* @param {string} projectId The id for the project containing the Cloud Function.
* @param {string} userAgent The user agent string which will be provided with the webserver request.
*/
const authorizeIap = async (clientId, projectId, userAgent) => {
const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
const JWT_HEADER = Buffer.from(
JSON.stringify({alg: 'RS256', typ: 'JWT'})
).toString('base64');
let jwt = '';
let jwtClaimset = '';
// Obtain an Oauth2 access token for the appspot service account
const res = await fetch(
`http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
{
headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
}
);
const tokenResponse = await res.json();
if (tokenResponse.error) {
return Promise.reject(tokenResponse.error);
}
const accessToken = tokenResponse.access_token;
const iat = Math.floor(new Date().getTime() / 1000);
const claims = {
iss: SERVICE_ACCOUNT,
aud: 'https://www.googleapis.com/oauth2/v4/token',
iat: iat,
exp: iat + 60,
target_audience: clientId,
};
jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
const toSign = [JWT_HEADER, jwtClaimset].join('.');
const blob = await fetch(
`https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
{
method: 'POST',
body: JSON.stringify({
bytesToSign: Buffer.from(toSign).toString('base64'),
}),
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${accessToken}`,
},
}
);
const blobJson = await blob.json();
if (blobJson.error) {
return Promise.reject(blobJson.error);
}
// Request service account signature on header and claimset
const jwtSignature = blobJson.signature;
jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
const form = new FormData();
form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
form.append('assertion', jwt);
const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
method: 'POST',
body: form,
});
const tokenJson = await token.json();
if (tokenJson.error) {
return Promise.reject(tokenJson.error);
}
return {
jwt: jwt,
idToken: tokenJson.id_token,
};
};
/**
* @param {string} url The url that the post request targets.
* @param {string} body The body of the post request.
* @param {string} idToken Bearer token used to authorize the iap request.
* @param {string} userAgent The user agent to identify the requester.
*/
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
const res = await fetch(url, {
method: 'POST',
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${idToken}`,
},
body: JSON.stringify(body),
});
if (!res.ok) {
const err = await res.text();
throw new Error(err);
}
};
// [END composer_trigger]
Sehen wir uns an, was los ist. Hier gibt es drei Funktionen: triggerDag
, authorizeIap
und makeIapPostRequest
.
triggerDag
ist die Funktion, die ausgelöst wird, wenn etwas in den festgelegten Cloud Storage-Bucket hochgeladen wird. Hier konfigurieren wir wichtige Variablen, die in den anderen Anfragen verwendet werden, z. B. PROJECT_ID
, CLIENT_ID
, WEBSERVER_ID
und DAG_NAME
. Sie ruft authorizeIap
und makeIapPostRequest
auf.
exports.triggerDag = async data => {
// Fill in your Composer environment information here.
// The project that holds your function
const PROJECT_ID = 'your-project-id';
// Navigate to your webserver's login page and get this from the URL
const CLIENT_ID = 'your-iap-client-id';
// This should be part of your webserver's URL:
// {tenant-project-id}.appspot.com
const WEBSERVER_ID = 'your-tenant-project-id';
// The name of the DAG you wish to trigger
const DAG_NAME = 'composer_sample_trigger_response_dag';
// Other constants
const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
const USER_AGENT = 'gcf-event-trigger';
const BODY = {conf: JSON.stringify(data)};
// Make the request
try {
const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);
return makeIapPostRequest(
WEBSERVER_URL,
BODY,
iap.idToken,
USER_AGENT,
iap.jwt
);
} catch (err) {
throw new Error(err);
}
};
authorizeIap
stellt mithilfe eines Dienstkontos und „Austausch“ eine Anfrage an den Proxy, der den Airflow-Webserver schützt. Ein JWT für ein ID-Token, das zur Authentifizierung von makeIapPostRequest
verwendet wird.
const authorizeIap = async (clientId, projectId, userAgent) => {
const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
const JWT_HEADER = Buffer.from(
JSON.stringify({alg: 'RS256', typ: 'JWT'})
).toString('base64');
let jwt = '';
let jwtClaimset = '';
// Obtain an Oauth2 access token for the appspot service account
const res = await fetch(
`http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
{
headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
}
);
const tokenResponse = await res.json();
if (tokenResponse.error) {
return Promise.reject(tokenResponse.error);
}
const accessToken = tokenResponse.access_token;
const iat = Math.floor(new Date().getTime() / 1000);
const claims = {
iss: SERVICE_ACCOUNT,
aud: 'https://www.googleapis.com/oauth2/v4/token',
iat: iat,
exp: iat + 60,
target_audience: clientId,
};
jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
const toSign = [JWT_HEADER, jwtClaimset].join('.');
const blob = await fetch(
`https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
{
method: 'POST',
body: JSON.stringify({
bytesToSign: Buffer.from(toSign).toString('base64'),
}),
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${accessToken}`,
},
}
);
const blobJson = await blob.json();
if (blobJson.error) {
return Promise.reject(blobJson.error);
}
// Request service account signature on header and claimset
const jwtSignature = blobJson.signature;
jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
const form = new FormData();
form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
form.append('assertion', jwt);
const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
method: 'POST',
body: form,
});
const tokenJson = await token.json();
if (tokenJson.error) {
return Promise.reject(tokenJson.error);
}
return {
jwt: jwt,
idToken: tokenJson.id_token,
};
};
makeIapPostRequest
ruft den Airflow-Webserver auf, um composer_sample_trigger_response_dag.
auszulösen. Der DAG-Name ist in die URL des Airflow-Webservers eingebettet, die mit dem url
-Parameter übergeben wird, und idToken
ist das Token, das wir in der authorizeIap
-Anfrage abgerufen haben.
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
const res = await fetch(url, {
method: 'POST',
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${idToken}`,
},
body: JSON.stringify(body),
});
if (!res.ok) {
const err = await res.text();
throw new Error(err);
}
};
5. DAG einrichten
Wechseln Sie in Cloud Shell zum Verzeichnis mit den Beispielworkflows. Es ist Teil der Python-docs-Beispiele, die Sie im Schritt "Client-ID abrufen" von GitHub heruntergeladen haben.
cd cd python-docs-samples/composer/workflows
DAG in Composer hochladen
Laden Sie den Beispiel-DAG mit dem folgenden Befehl in den DAG-Speicher-Bucket Ihrer Composer-Umgebung hoch. Dabei ist <environment_name>
der Name der Composer-Umgebung und <location>
der Name der Region, in der sie sich befindet. trigger_response_dag.py
ist der DAG, mit dem wir zusammenarbeiten.
gcloud composer environments storage dags import \ --environment <environment_name> \ --location <location> \ --source trigger_response_dag.py
Wenn Ihre Composer-Umgebung beispielsweise den Namen my-composer
hatte und sich in us-central1
befindet, lautet der Befehl
gcloud composer environments storage dags import \ --environment my-composer \ --location us-central1 \ --source trigger_response_dag.py
DAG durchgehen
Der DAG-Code in trigger_response.py
sieht so aus:
import datetime
import airflow
from airflow.operators import bash_operator
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': datetime.datetime(2017, 1, 1),
}
with airflow.DAG(
'composer_sample_trigger_response_dag',
default_args=default_args,
# Not scheduled, trigger only
schedule_interval=None) as dag:
# Print the dag_run's configuration, which includes information about the
# Cloud Storage object change.
print_gcs_info = bash_operator.BashOperator(
task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')
Der Abschnitt default_args
enthält die Standardargumente, die für das BaseOperator-Modell in Apache Airflow erforderlich sind. Dieser Abschnitt würde mit diesen Parametern in jedem Apache Airflow-DAG angezeigt werden. Für owner
ist momentan Composer Example
festgelegt, aber du kannst diesen Namen in deinen Namen ändern. depends_on_past
zeigt, dass dieser DAG nicht von vorherigen DAGs abhängig ist. Die drei E-Mail-Bereiche email
, email_on_failure
und email_on_retry
sind so festgelegt, dass keine E-Mail-Benachrichtigungen basierend auf dem Status dieses DAG eingehen. Der DAG wiederholt den Vorgang nur einmal, da retries
auf 1 gesetzt ist, und zwar nach fünf Minuten pro retry_delay
. start_date
gibt normalerweise vor, wann ein DAG in Verbindung mit seinem (später festgelegten) schedule_interval
ausgeführt werden soll. Im Fall dieses DAG ist dies jedoch nicht relevant. Als Datum ist der 1. Januar 2017 festgelegt, es kann aber auch ein Datum in der Vergangenheit liegen.
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': datetime.datetime(2017, 1, 1),
}
Im Bereich with airflow.DAG
wird der auszuführende DAG konfiguriert. Er wird mit der Aufgaben-ID composer_sample_trigger_response_dag
, den Standardargumenten aus dem Abschnitt default_args
und, vor allem mit dem schedule_interval
None
, ausgeführt. schedule_interval
ist auf None
gesetzt, da wir diesen bestimmten DAG mit unserer Cloud Functions-Funktion auslösen. Aus diesem Grund ist die start_date
auf default_args
nicht relevant.
Bei der Ausführung gibt der DAG seine Konfiguration aus, die in der Variablen print_gcs_info
festgelegt ist.
with airflow.DAG(
'composer_sample_trigger_response_dag',
default_args=default_args,
# Not scheduled, trigger only
schedule_interval=None) as dag:
# Print the dag_run's configuration, which includes information about the
# Cloud Storage object change.
print_gcs_info = bash_operator.BashOperator(
task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')
6. Funktion testen
Öffnen Sie die Composer-Umgebung und klicken Sie in der Zeile mit dem Umgebungsnamen auf den Airflow-Link. | |
Öffnen Sie die | |
Öffnen Sie einen separaten Tab und laden Sie eine Datei in den Cloud Storage-Bucket hoch, den Sie zuvor erstellt und als Trigger für Ihre Cloud Functions-Funktion angegeben haben. Dazu können Sie die Console oder den gsutil-Befehl verwenden. | |
Gehen Sie zurück zum Tab mit der Airflow-Benutzeroberfläche und klicken Sie auf „Graph View“ (Diagrammansicht). | |
Klicken Sie auf die Aufgabe | |
Klicken Sie auf „Protokoll anzeigen“. oben rechts im Menü | |
Die Logs enthalten Informationen zu der Datei, die Sie in Ihren Cloud Storage-Bucket hochgeladen haben. |
Glückwunsch! Sie haben soeben einen Airflow-DAG mit Node.js und Google Cloud Functions ausgelöst.
7. Bereinigen
So vermeiden Sie, dass Ihrem GCP-Konto die in dieser Kurzanleitung verwendeten Ressourcen in Rechnung gestellt werden:
- Optional: Laden Sie die Daten aus dem Cloud Storage-Bucket für die Cloud Composer-Umgebung und aus dem Storage-Bucket herunter, den Sie für diese Kurzanleitung erstellt haben, um Ihre Daten zu speichern.
- Löschen Sie den Cloud Storage-Bucket für die Umgebung und den von Ihnen erstellten Bucket.
- Löschen Sie die Cloud Composer-Umgebung. Der Storage-Bucket für die Umgebung wird durch das Löschen der Umgebung nicht gelöscht.
- (Optional) Beim serverlosen Computing sind die ersten 2 Millionen Aufrufe pro Monat kostenlos. Wenn Sie die Funktion auf null skalieren, fallen keine Kosten an. Weitere Informationen finden Sie unter Preise. Wenn Sie Ihre Cloud Functions-Funktion jedoch löschen möchten, klicken Sie auf „LÖSCHEN“. oben rechts auf der Übersichtsseite der Funktion
Optional können Sie das Projekt auch löschen:
- Rufen Sie in der GCP Console die Seite Projekte auf.
- Wählen Sie in der Projektliste das Projekt aus, das Sie löschen möchten, und klicken Sie auf Löschen.
- Geben Sie die Projekt-ID in das Feld ein und klicken Sie auf Beenden, um das Projekt zu löschen.