1. Giới thiệu
Apache Airflow được thiết kế để chạy DAG theo lịch trình thường xuyên, nhưng bạn cũng có thể kích hoạt DAG theo các sự kiện, chẳng hạn như thay đổi trong bộ chứa Cloud Storage hoặc khi có thông báo được đẩy lên Cloud Pub/Sub. Để thực hiện việc này, có thể kích hoạt DAG của Cloud Composer bằng Cloud Functions.
Ví dụ trong phòng thí nghiệm này chạy một DAG đơn giản mỗi khi có thay đổi xảy ra trong bộ chứa Cloud Storage. DAG này sử dụng BashOperator để chạy một lệnh bash in thông tin thay đổi về nội dung đã được tải lên bộ chứa Cloud Storage.
Trước khi bắt đầu phòng thí nghiệm này, bạn nên hoàn thành các lớp học lập trình Giới thiệu về Cloud Composer và Bắt đầu sử dụng Hàm đám mây. Nếu tạo một Môi trường Compose trong lớp học lập trình Giới thiệu về Cloud Composer, thì bạn có thể sử dụng môi trường đó trong phòng thí nghiệm này.
Sản phẩm bạn sẽ tạo ra
Trong lớp học lập trình này, bạn sẽ:
- Tải một tệp lên Google Cloud Storage. Thao tác này sẽ
- Kích hoạt Google Cloud Function bằng thời gian chạy Node.JS
- Hàm này sẽ thực thi một DAG trong Google Cloud Composer
- Thao tác này chạy một lệnh bash đơn giản để in nội dung thay đổi sang bộ chứa Google Cloud Storage
Kiến thức bạn sẽ học được
- Cách kích hoạt DAG Apache Airflow bằng Google Cloud Functions + Node.js
Bạn cần
- Tài khoản GCP
- Hiểu biết cơ bản về JavaScript
- Kiến thức cơ bản về Cloud Composer/Airflow và Cloud Functions
- Điều chỉnh mức độ thoải mái bằng các lệnh CLI
2. Thiết lập GCP
Chọn hoặc Tạo dự án
Chọn hoặc tạo một Dự án Google Cloud Platform. Nếu bạn tạo một dự án mới, hãy làm theo các bước tại đây.
Ghi lại Mã dự án của bạn để sử dụng trong các bước sau.
Nếu đang tạo một dự án mới, bạn sẽ thấy mã dự án ngay bên dưới Tên dự án trên trang tạo | |
Nếu đã tạo một dự án, bạn có thể tìm thấy mã này trên trang chủ của bảng điều khiển trong thẻ Thông tin dự án |
Bật các API
Tạo môi trường Compose
Tạo môi trường Cloud Composer với cấu hình sau:
Có thể giữ nguyên tất cả cấu hình khác theo mặc định. Nhấp vào "Tạo" ở dưới cùng.Ghi lại tên và vị trí Môi trường của Trình soạn thảo - bạn sẽ cần chúng trong các bước sau này. |
Tạo bộ chứa Cloud Storage
Trong dự án của bạn, hãy tạo một bộ chứa Cloud Storage với cấu hình sau:
Nhấn "Tạo" khi đã sẵn sàng Hãy nhớ ghi lại tên bộ chứa Cloud Storage của bạn để thực hiện các bước sau. |
3. Thiết lập Google Cloud Functions (GCF)
Để thiết lập GCF, chúng ta sẽ chạy các lệnh trong Google Cloud Shell.
Mặc dù bạn có thể vận hành Google Cloud từ xa trên máy tính xách tay bằng công cụ dòng lệnh gcloud, nhưng trong lớp học lập trình này, chúng ta sẽ sử dụng Google Cloud Shell, một môi trường dòng lệnh chạy trên Đám mây.
Máy ảo dựa trên Debian này được tải tất cả các công cụ phát triển mà bạn cần. Dịch vụ này cung cấp thư mục gốc có dung lượng ổn định 5 GB và chạy trên Google Cloud, giúp nâng cao đáng kể hiệu suất và khả năng xác thực của mạng. Tức là tất cả những gì bạn cần để thực hiện lớp học lập trình này là một trình duyệt (vâng, trình duyệt này hoạt động trên Chromebook).
Để kích hoạt Google Cloud Shell, trong bảng điều khiển dành cho nhà phát triển, hãy nhấp vào nút ở trên cùng bên phải (chỉ mất vài phút để cấp phép và kết nối với môi trường): |
Cấp quyền ký blob vào Tài khoản dịch vụ Cloud Functions
Để GCF xác thực với Cloud IAP (máy chủ proxy bảo vệ máy chủ web của Airflow), bạn cần cấp cho GCF (tài khoản dịch vụ Appspot) vai trò Service Account Token Creator
. Bạn có thể thực hiện việc này bằng cách chạy lệnh sau trong Cloud Shell, thay thế tên dự án bằng <your-project-id>
.
gcloud iam service-accounts add-iam-policy-binding \ <your-project-id>@appspot.gserviceaccount.com \ --member=serviceAccount:<your-project-id>@appspot.gserviceaccount.com \ --role=roles/iam.serviceAccountTokenCreator
Ví dụ: nếu dự án của bạn có tên là my-project
, lệnh sẽ là
gcloud iam service-accounts add-iam-policy-binding \ my-project@appspot.gserviceaccount.com \ --member=serviceAccount:my-project@appspot.gserviceaccount.com \ --role=roles/iam.serviceAccountTokenCreator
Lấy mã ứng dụng khách
Để tạo mã thông báo nhằm xác thực IAP trên Cloud, hàm này sẽ yêu cầu mã ứng dụng khách của proxy giúp bảo vệ máy chủ web của Airflow. Cloud Composer API không trực tiếp cung cấp thông tin này. Thay vào đó, hãy gửi một yêu cầu chưa được xác thực đến máy chủ web Airflow và thu thập mã ứng dụng khách từ URL chuyển hướng. Chúng ta sẽ thực hiện việc đó bằng cách chạy một tệp python bằng Cloud Shell để thu thập mã ứng dụng khách.
Tải mã cần thiết từ GitHub xuống bằng cách chạy lệnh sau trong Cloud Shell
cd git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
Nếu bạn gặp lỗi do thư mục này đã tồn tại, hãy cập nhật thư mục lên phiên bản mới nhất bằng cách chạy lệnh sau
cd python-docs-samples/ git pull origin master
Thay đổi thư mục thích hợp bằng cách chạy
cd python-docs-samples/composer/rest
Chạy mã python để lấy mã ứng dụng khách của bạn, thay thế tên dự án cho <your-project-id>
, vị trí của môi trường Composer mà bạn đã tạo trước đó cho <your-composer-location>
và tên của môi trường Composer mà bạn đã tạo trước đó cho <your-composer-environment>
python3 get_client_id.py <your-project-id> <your-composer-location> <your-composer-environment>
Ví dụ: nếu tên dự án là my-project
, vị trí Trình soạn thảo của bạn là us-central1
và tên môi trường là my-composer
, thì lệnh của bạn sẽ là
python3 get_client_id.py my-project us-central1 my-composer
get_client_id.py
sẽ thực hiện những việc sau:
- Xác thực bằng Google Cloud
- Gửi một yêu cầu HTTP chưa được xác thực đến máy chủ web Airflow để nhận URI chuyển hướng
- Trích xuất tham số truy vấn
client_id
từ lệnh chuyển hướng đó - In để bạn sử dụng
Client-ID của bạn sẽ được in ra dòng lệnh và có dạng như sau:
12345678987654321-abc1def3ghi5jkl7mno8pqr0.apps.googleusercontent.com
4. Tạo hàm
Trong Cloud Shell, hãy sao chép kho lưu trữ với mã mẫu cần thiết bằng cách chạy
cd git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git
Hãy chuyển sang thư mục cần thiết và để Cloud Shell mở trong khi bạn hoàn tất một số bước tiếp theo
cd nodejs-docs-samples/composer/functions/composer-storage-trigger
Chuyển đến trang Google Cloud Functions bằng cách nhấp vào Trình đơn điều hướng rồi nhấp vào "Cloud Functions" | |
Nhấp vào "TẠO CHỨC NĂNG" ở đầu trang | |
Đặt tên cho hàm là "hàm của tôi" và để bộ nhớ theo mặc định là 256 MB. | |
Đặt Điều kiện kích hoạt thành "Cloud Storage", để loại Sự kiện là "Hoàn tất/Tạo" và duyệt đến bộ chứa bạn đã tạo ở bước Tạo bộ chứa Cloud Storage. | |
Đặt Mã nguồn thành "Trình chỉnh sửa cùng dòng" và đặt thời gian chạy thành "Node.js 8" |
Trong Cloud Shell, hãy chạy lệnh sau. Thao tác này sẽ mở http://index.js và package.json trong Cloud Shell Editor (Trình chỉnh sửa Cloud Shell)
cloudshell edit index.js package.json
Nhấp vào thẻ package.json, sao chép và dán mã đó vào phần package.json của trình chỉnh sửa nội tuyến Cloud Functions | |
Đặt "Hàm để thực thi" để kích hoạtDag | |
Nhấp vào tabindex.js, sao chép mã và dán mã đó vào phầnindex.js của trình chỉnh sửa nội tuyến Hàm đám mây | |
Thay đổi |
Trong Cloud Shell, hãy chạy lệnh sau, thay thế <your-environment-name> bằng tên môi trường Composer và <your-composer-region> của bạn bằng khu vực đặt Môi trường Composer.
gcloud composer environments describe <your-environment-name> --location <your-composer-region>
Ví dụ: nếu môi trường của bạn có tên là my-composer-environment
và nằm trong us-central1
, lệnh của bạn sẽ là
gcloud composer environments describe my-composer-environment --location us-central1
Kết quả đầu ra sẽ có dạng như sau:
config:
airflowUri: https://abc123efghi456k-tp.appspot.com
dagGcsPrefix: gs://narnia-north1-test-codelab-jklmno-bucket/dags
gkeCluster: projects/a-project/zones/narnia-north1-b/clusters/narnia-north1-test-codelab-jklmno-gke
nodeConfig:
diskSizeGb: 100
location: projects/a-project/zones/narnia-north1-b
machineType: projects/a-project/zones/narnia-north1-b/machineTypes/n1-standard-1
network: projects/a-project/global/networks/default
oauthScopes:
- https://www.googleapis.com/auth/cloud-platform
serviceAccount: 987665432-compute@developer.gserviceaccount.com
nodeCount: 3
softwareConfig:
imageVersion: composer-1.7.0-airflow-1.10.0
pythonVersion: '2'
createTime: '2019-05-29T09:41:27.919Z'
name: projects/a-project/locations/narnia-north1/environments/my-composer-environment
state: RUNNING
updateTime: '2019-05-29T09:56:29.969Z'
uuid: 123456-7890-9876-543-210123456
Trong kết quả đó, hãy tìm biến có tên | |
Nhấp vào nút "Thêm" đường liên kết thả xuống, sau đó chọn Khu vực địa lý gần bạn nhất | |
Chọn "Thử lại khi không thành công" | |
Nhấp vào "Tạo" để tạo Hàm đám mây |
Khám phá mã
Mã mà bạn đã sao chép từindex.js sẽ trông giống như sau:
// [START composer_trigger]
'use strict';
const fetch = require('node-fetch');
const FormData = require('form-data');
/**
* Triggered from a message on a Cloud Storage bucket.
*
* IAP authorization based on:
* https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
* and
* https://cloud.google.com/iap/docs/authentication-howto
*
* @param {!Object} data The Cloud Functions event data.
* @returns {Promise}
*/
exports.triggerDag = async data => {
// Fill in your Composer environment information here.
// The project that holds your function
const PROJECT_ID = 'your-project-id';
// Navigate to your webserver's login page and get this from the URL
const CLIENT_ID = 'your-iap-client-id';
// This should be part of your webserver's URL:
// {tenant-project-id}.appspot.com
const WEBSERVER_ID = 'your-tenant-project-id';
// The name of the DAG you wish to trigger
const DAG_NAME = 'composer_sample_trigger_response_dag';
// Other constants
const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
const USER_AGENT = 'gcf-event-trigger';
const BODY = {conf: JSON.stringify(data)};
// Make the request
try {
const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);
return makeIapPostRequest(
WEBSERVER_URL,
BODY,
iap.idToken,
USER_AGENT,
iap.jwt
);
} catch (err) {
throw new Error(err);
}
};
/**
* @param {string} clientId The client id associated with the Composer webserver application.
* @param {string} projectId The id for the project containing the Cloud Function.
* @param {string} userAgent The user agent string which will be provided with the webserver request.
*/
const authorizeIap = async (clientId, projectId, userAgent) => {
const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
const JWT_HEADER = Buffer.from(
JSON.stringify({alg: 'RS256', typ: 'JWT'})
).toString('base64');
let jwt = '';
let jwtClaimset = '';
// Obtain an Oauth2 access token for the appspot service account
const res = await fetch(
`http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
{
headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
}
);
const tokenResponse = await res.json();
if (tokenResponse.error) {
return Promise.reject(tokenResponse.error);
}
const accessToken = tokenResponse.access_token;
const iat = Math.floor(new Date().getTime() / 1000);
const claims = {
iss: SERVICE_ACCOUNT,
aud: 'https://www.googleapis.com/oauth2/v4/token',
iat: iat,
exp: iat + 60,
target_audience: clientId,
};
jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
const toSign = [JWT_HEADER, jwtClaimset].join('.');
const blob = await fetch(
`https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
{
method: 'POST',
body: JSON.stringify({
bytesToSign: Buffer.from(toSign).toString('base64'),
}),
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${accessToken}`,
},
}
);
const blobJson = await blob.json();
if (blobJson.error) {
return Promise.reject(blobJson.error);
}
// Request service account signature on header and claimset
const jwtSignature = blobJson.signature;
jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
const form = new FormData();
form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
form.append('assertion', jwt);
const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
method: 'POST',
body: form,
});
const tokenJson = await token.json();
if (tokenJson.error) {
return Promise.reject(tokenJson.error);
}
return {
jwt: jwt,
idToken: tokenJson.id_token,
};
};
/**
* @param {string} url The url that the post request targets.
* @param {string} body The body of the post request.
* @param {string} idToken Bearer token used to authorize the iap request.
* @param {string} userAgent The user agent to identify the requester.
*/
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
const res = await fetch(url, {
method: 'POST',
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${idToken}`,
},
body: JSON.stringify(body),
});
if (!res.ok) {
const err = await res.text();
throw new Error(err);
}
};
// [END composer_trigger]
Hãy xem điều gì đang xảy ra. Có 3 hàm ở đây: triggerDag
, authorizeIap
và makeIapPostRequest
triggerDag
là hàm được kích hoạt khi chúng ta tải nội dung nào đó lên bộ chứa Cloud Storage được chỉ định. Đây là nơi chúng ta định cấu hình các biến quan trọng dùng trong các yêu cầu khác, chẳng hạn như PROJECT_ID
, CLIENT_ID
, WEBSERVER_ID
và DAG_NAME
. Phương thức này gọi authorizeIap
và makeIapPostRequest
.
exports.triggerDag = async data => {
// Fill in your Composer environment information here.
// The project that holds your function
const PROJECT_ID = 'your-project-id';
// Navigate to your webserver's login page and get this from the URL
const CLIENT_ID = 'your-iap-client-id';
// This should be part of your webserver's URL:
// {tenant-project-id}.appspot.com
const WEBSERVER_ID = 'your-tenant-project-id';
// The name of the DAG you wish to trigger
const DAG_NAME = 'composer_sample_trigger_response_dag';
// Other constants
const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
const USER_AGENT = 'gcf-event-trigger';
const BODY = {conf: JSON.stringify(data)};
// Make the request
try {
const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);
return makeIapPostRequest(
WEBSERVER_URL,
BODY,
iap.idToken,
USER_AGENT,
iap.jwt
);
} catch (err) {
throw new Error(err);
}
};
authorizeIap
gửi yêu cầu tới proxy bảo vệ máy chủ web Airflow bằng tài khoản dịch vụ và "trao đổi" JWT cho mã thông báo nhận dạng sẽ được dùng để xác thực makeIapPostRequest
.
const authorizeIap = async (clientId, projectId, userAgent) => {
const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
const JWT_HEADER = Buffer.from(
JSON.stringify({alg: 'RS256', typ: 'JWT'})
).toString('base64');
let jwt = '';
let jwtClaimset = '';
// Obtain an Oauth2 access token for the appspot service account
const res = await fetch(
`http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
{
headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
}
);
const tokenResponse = await res.json();
if (tokenResponse.error) {
return Promise.reject(tokenResponse.error);
}
const accessToken = tokenResponse.access_token;
const iat = Math.floor(new Date().getTime() / 1000);
const claims = {
iss: SERVICE_ACCOUNT,
aud: 'https://www.googleapis.com/oauth2/v4/token',
iat: iat,
exp: iat + 60,
target_audience: clientId,
};
jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
const toSign = [JWT_HEADER, jwtClaimset].join('.');
const blob = await fetch(
`https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
{
method: 'POST',
body: JSON.stringify({
bytesToSign: Buffer.from(toSign).toString('base64'),
}),
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${accessToken}`,
},
}
);
const blobJson = await blob.json();
if (blobJson.error) {
return Promise.reject(blobJson.error);
}
// Request service account signature on header and claimset
const jwtSignature = blobJson.signature;
jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
const form = new FormData();
form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
form.append('assertion', jwt);
const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
method: 'POST',
body: form,
});
const tokenJson = await token.json();
if (tokenJson.error) {
return Promise.reject(tokenJson.error);
}
return {
jwt: jwt,
idToken: tokenJson.id_token,
};
};
makeIapPostRequest
thực hiện lệnh gọi đến máy chủ web của Airflow để kích hoạt composer_sample_trigger_response_dag.
. Tên DAG được nhúng trong URL máy chủ web của Airflow được truyền vào cùng với tham số url
và idToken
là mã thông báo mà chúng tôi nhận được trong yêu cầu authorizeIap
.
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
const res = await fetch(url, {
method: 'POST',
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${idToken}`,
},
body: JSON.stringify(body),
});
if (!res.ok) {
const err = await res.text();
throw new Error(err);
}
};
5. Thiết lập DAG
Trong Cloud Shell, hãy thay đổi sang thư mục có quy trình làm việc mẫu. Đây là một phần của python-docs-samples mà bạn đã tải xuống từ GitHub trong bước Lấy mã ứng dụng khách.
cd cd python-docs-samples/composer/workflows
Tải DAG lên Composer
Tải DAG mẫu lên bộ chứa lưu trữ DAG của môi trường Composer bằng lệnh sau, trong đó <environment_name>
là tên của môi trường Composer và <location>
là tên của khu vực nơi chứa DAG đó. trigger_response_dag.py
là DAG mà chúng tôi sẽ làm việc cùng.
gcloud composer environments storage dags import \ --environment <environment_name> \ --location <location> \ --source trigger_response_dag.py
Ví dụ: nếu môi trường Trình soạn thảo của bạn có tên là my-composer
và nằm trong us-central1
, lệnh của bạn sẽ là
gcloud composer environments storage dags import \ --environment my-composer \ --location us-central1 \ --source trigger_response_dag.py
Vượt qua DAG
Mã DAG trong trigger_response.py
có dạng như sau
import datetime
import airflow
from airflow.operators import bash_operator
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': datetime.datetime(2017, 1, 1),
}
with airflow.DAG(
'composer_sample_trigger_response_dag',
default_args=default_args,
# Not scheduled, trigger only
schedule_interval=None) as dag:
# Print the dag_run's configuration, which includes information about the
# Cloud Storage object change.
print_gcs_info = bash_operator.BashOperator(
task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')
Phần default_args
chứa các đối số mặc định theo yêu cầu của mô hình BaseOperator trong Apache Airflow. Bạn sẽ thấy mục này cùng với các tham số này trong mọi DAG của Apache Airflow. owner
hiện được đặt thành Composer Example
, nhưng bạn có thể thay đổi thành tên của mình nếu muốn. depends_on_past
cho thấy rằng DAG này không phụ thuộc vào bất kỳ DAG nào trước đó. Ba phần của email, email
, email_on_failure
và email_on_retry
được thiết lập để không có thông báo qua email nào được gửi dựa trên trạng thái của DAG này. DAG sẽ chỉ thử lại một lần, vì retries
được đặt thành 1 và sẽ thực hiện lại sau 5 phút, theo retry_delay
. start_date
thường quy định thời điểm nên chạy DAG, cùng với schedule_interval
(thiết lập sau) nhưng không phù hợp đối với DAG này. Ngày này được đặt là ngày 1 tháng 1 năm 2017, nhưng bạn có thể đặt thành bất kỳ ngày nào trong quá khứ.
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': datetime.datetime(2017, 1, 1),
}
Phần with airflow.DAG
định cấu hình DAG sẽ được chạy. Thao tác này sẽ chạy bằng mã tác vụ composer_sample_trigger_response_dag
, các đối số mặc định trong phần default_args
và quan trọng nhất là với schedule_interval
là None
. schedule_interval
được đặt thành None
vì chúng ta đang kích hoạt DAG cụ thể này bằng Chức năng đám mây của mình. Đây là lý do tại sao start_date
trong default_args
không liên quan.
Khi thực thi, DAG sẽ in cấu hình như quy định trong biến print_gcs_info
.
with airflow.DAG(
'composer_sample_trigger_response_dag',
default_args=default_args,
# Not scheduled, trigger only
schedule_interval=None) as dag:
# Print the dag_run's configuration, which includes information about the
# Cloud Storage object change.
print_gcs_info = bash_operator.BashOperator(
task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')
6. Kiểm tra chức năng
Mở Composer Environment (Môi trường Composer) và trong hàng có tên môi trường, hãy nhấp vào đường liên kết Airflow | |
Mở | |
Mở một thẻ riêng và tải tệp bất kỳ lên bộ chứa Cloud Storage mà bạn đã tạo trước đó và chỉ định làm điều kiện kích hoạt cho Chức năng đám mây của bạn. Bạn có thể thực hiện việc này qua Bảng điều khiển hoặc sử dụng lệnh YT. | |
Quay lại thẻ có giao diện người dùng Airflow rồi nhấp vào Chế độ xem biểu đồ | |
Nhấp vào nhiệm vụ | |
Nhấp vào "Xem nhật ký" ở góc trên bên phải trình đơn | |
Trong nhật ký, bạn sẽ thấy thông tin về tệp mà bạn đã tải lên bộ chứa Cloud Storage. |
Xin chúc mừng! Bạn vừa kích hoạt một DAG của Airflow bằng Node.js và Google Cloud Functions!
7. Dọn dẹp
Để tránh bị tính phí vào tài khoản GCP đối với các tài nguyên được sử dụng trong phần khởi động nhanh này, hãy làm như sau:
- (Không bắt buộc) Để lưu dữ liệu của bạn, hãy tải dữ liệu xuống từ bộ chứa Cloud Storage cho môi trường Cloud Composer và bộ chứa lưu trữ bạn đã tạo cho bước bắt đầu nhanh này.
- Xoá bộ chứa Cloud Storage đối với môi trường và bộ chứa bạn đã tạo
- Xoá môi trường Cloud Composer. Xin lưu ý rằng việc xoá môi trường sẽ không xoá bộ chứa lưu trữ của môi trường đó.
- (Không bắt buộc) Với điện toán không máy chủ, 2 triệu lệnh gọi đầu tiên mỗi tháng là miễn phí và khi mở rộng quy mô hàm về 0, bạn sẽ không bị tính phí (xem giá để biết thêm chi tiết). Tuy nhiên, nếu bạn muốn xoá Hàm đám mây của mình, hãy nhấp vào "XOÁ" ở trên cùng bên phải trang tổng quan cho hàm của bạn
Bạn cũng có thể tuỳ ý xoá dự án:
- Trong Bảng điều khiển GCP, hãy chuyển đến trang Dự án.
- Trong danh sách dự án, hãy chọn dự án mà bạn muốn xoá rồi nhấp vào Xoá.
- Trong hộp này, hãy nhập mã dự án rồi nhấp vào Tắt để xoá dự án.