1. مقدمه
Apache Airflow برای اجرای DAG ها در یک برنامه منظم طراحی شده است، اما شما همچنین می توانید DAG ها را در پاسخ به رویدادهایی مانند تغییر در یک سطل ذخیره سازی ابری یا ارسال پیام به Cloud Pub/Sub فعال کنید. برای انجام این کار، DAG های Cloud Composer را می توان توسط Cloud Functions راه اندازی کرد.
مثال در این آزمایشگاه هر بار که تغییری در یک سطل ذخیرهسازی ابری رخ میدهد، یک DAG ساده را اجرا میکند. این DAG از BashOperator برای اجرای دستور bash استفاده میکند که اطلاعات تغییر را در مورد آنچه در سطل ذخیرهسازی ابری آپلود شده است چاپ میکند.
قبل از شروع این آزمایشگاه، توصیه می شود مقدمه ای برای Cloud Composer و شروع به کار با کدهای Cloud Functions را تکمیل کنید. اگر یک محیط Composer را در Intro to Cloud Composer codelab ایجاد کنید، می توانید از آن محیط در این آزمایشگاه استفاده کنید.
آنچه شما خواهید ساخت
در این نرم افزار کد، شما:
- یک فایل در Google Cloud Storage آپلود کنید، که این کار را انجام خواهد داد
- یک تابع Google Cloud را با استفاده از زمان اجرا Node.JS راه اندازی کنید
- این تابع یک DAG را در Google Cloud Composer اجرا می کند
- این یک دستور bash ساده را اجرا میکند که تغییر را در سطل Google Cloud Storage چاپ میکند
آنچه شما یاد خواهید گرفت
- نحوه راه اندازی Apache Airflow DAG با استفاده از Google Cloud Functions + Node.js
آنچه شما نیاز دارید
- حساب GCP
- درک اولیه جاوا اسکریپت
- دانش اولیه Cloud Composer/Airflow و Cloud Functions
- راحتی با استفاده از دستورات CLI
2. راه اندازی GCP
پروژه را انتخاب یا ایجاد کنید
یک پروژه Google Cloud Platform را انتخاب یا ایجاد کنید. اگر در حال ایجاد یک پروژه جدید هستید، مراحل موجود در اینجا را دنبال کنید.
شناسه پروژه خود را یادداشت کنید که در مراحل بعدی از آن استفاده خواهید کرد.
اگر در حال ایجاد یک پروژه جدید هستید، شناسه پروژه درست در زیر نام پروژه در صفحه ایجاد پیدا می شود | |
اگر قبلاً پروژه ای ایجاد کرده اید، می توانید شناسه را در صفحه اصلی کنسول در کارت اطلاعات پروژه پیدا کنید |
API ها را فعال کنید
ایجاد محیط آهنگساز
یک محیط Cloud Composer با پیکربندی زیر ایجاد کنید :
تمام تنظیمات دیگر می توانند در حالت پیش فرض خود باقی بمانند. روی "ایجاد" در پایین کلیک کنید. نام و مکان Composer Environment خود را یادداشت کنید - در مراحل بعدی به آنها نیاز خواهید داشت. |
ایجاد سطل ذخیره سازی ابری
در پروژه خود، یک سطل ذخیره سازی ابری با پیکربندی زیر ایجاد کنید :
وقتی آماده شدید، «ایجاد» را فشار دهید. مطمئن شوید که نام سطل فضای ذخیرهسازی ابری خود را برای مراحل بعدی یادداشت کردهاید. |
3. راه اندازی توابع ابری Google (GCF)
برای راه اندازی GCF، دستورات را در Google Cloud Shell اجرا خواهیم کرد.
در حالی که Google Cloud را میتوان از راه دور از لپتاپ با استفاده از ابزار خط فرمان gcloud اداره کرد، در این کد لبه از Google Cloud Shell استفاده میکنیم، یک محیط خط فرمان که در Cloud اجرا میشود.
این ماشین مجازی مبتنی بر دبیان با تمام ابزارهای توسعه که شما نیاز دارید بارگذاری شده است. این یک فهرست اصلی 5 گیگابایتی دائمی را ارائه می دهد و در Google Cloud اجرا می شود و عملکرد و احراز هویت شبکه را بسیار افزایش می دهد. این بدان معنی است که تمام چیزی که برای این کد لبه نیاز دارید یک مرورگر است (بله، روی کروم بوک کار می کند).
برای فعال کردن Google Cloud Shell، از کنسول توسعهدهنده روی دکمه سمت راست بالا کلیک کنید (تنها چند لحظه طول میکشد تا محیط را تهیه کرده و به آن متصل شوید): |
مجوزهای امضای blob را به حساب خدمات Cloud Functions اعطا کنید
برای اینکه GCF در Cloud IAP ، پروکسی که از وب سرور Airflow محافظت میکند، احراز هویت کند، باید به حساب Appspot Service GCF نقش Service Account Token Creator
را اعطا کنید. این کار را با اجرای دستور زیر در Cloud Shell خود انجام دهید و نام پروژه خود را با <your-project-id>
جایگزین کنید.
gcloud iam service-accounts add-iam-policy-binding \ <your-project-id>@appspot.gserviceaccount.com \ --member=serviceAccount:<your-project-id>@appspot.gserviceaccount.com \ --role=roles/iam.serviceAccountTokenCreator
به عنوان مثال، اگر پروژه شما my-project
نام داشته باشد، دستور شما خواهد بود
gcloud iam service-accounts add-iam-policy-binding \ my-project@appspot.gserviceaccount.com \ --member=serviceAccount:my-project@appspot.gserviceaccount.com \ --role=roles/iam.serviceAccountTokenCreator
دریافت شناسه مشتری
برای ساختن یک توکن برای احراز هویت در Cloud IAP ، این تابع به شناسه مشتری پروکسی نیاز دارد که از وب سرور Airflow محافظت می کند. Cloud Composer API این اطلاعات را مستقیماً ارائه نمی کند. در عوض، یک درخواست احراز هویت نشده به وب سرور Airflow ارسال کنید و شناسه مشتری را از URL تغییر مسیر بگیرید. ما این کار را با اجرای یک فایل پایتون با استفاده از Cloud Shell برای گرفتن شناسه مشتری انجام خواهیم داد.
با اجرای دستور زیر در Cloud Shell خود کد لازم را از GitHub دانلود کنید
cd git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
اگر به دلیل وجود این دایرکتوری با خطا مواجه شدید، با اجرای دستور زیر آن را به آخرین نسخه به روز کنید
cd python-docs-samples/ git pull origin master
با اجرا به دایرکتوری مناسب تغییر دهید
cd python-docs-samples/composer/rest
کد پایتون را برای دریافت شناسه مشتری خود اجرا کنید و نام پروژه خود را به جای <your-project-id>
، مکان محیط Composer که قبلاً برای <your-composer-location>
ایجاد کردید و نام محیط Composer را جایگزین کنید. قبلا برای <your-composer-environment>
ایجاد شده است
python3 get_client_id.py <your-project-id> <your-composer-location> <your-composer-environment>
برای مثال، اگر نام پروژه شما my-project
، مکان Composer شما us-central1
و نام محیط شما my-composer
باشد، دستور شما این خواهد بود.
python3 get_client_id.py my-project us-central1 my-composer
get_client_id.py
کارهای زیر را انجام می دهد:
- با Google Cloud احراز هویت می شود
- برای دریافت URI تغییر مسیر، یک درخواست HTTP تأیید نشده به سرور وب Airflow میکند.
- پارامتر پرس و جو
client_id
را از آن تغییر مسیر استخراج می کند - آن را برای استفاده شما چاپ می کند
شناسه مشتری شما در خط فرمان چاپ می شود و چیزی شبیه به این خواهد بود:
12345678987654321-abc1def3ghi5jkl7mno8pqr0.apps.googleusercontent.com
4. عملکرد خود را ایجاد کنید
در Cloud Shell خود، با اجرا کردن، مخزن را با کد نمونه لازم کلون کنید
cd git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git
دایرکتوری لازم را تغییر دهید و در حین انجام چند مرحله بعدی، Cloud Shell خود را باز بگذارید
cd nodejs-docs-samples/composer/functions/composer-storage-trigger
با کلیک بر روی منوی ناوبری و سپس کلیک بر روی "توابع ابری" به صفحه Google Cloud Functions بروید. | |
در بالای صفحه بر روی "CREATE FUNCTION" کلیک کنید | |
نام تابع خود را "my-function" بگذارید و حافظه را روی حالت پیش فرض، 256 مگابایت بگذارید. | |
Trigger را روی "Cloud Storage" تنظیم کنید، نوع رویداد را به عنوان "Finalize/Create" بگذارید و به سطلی که در مرحله Create a Cloud Storage Bucket ایجاد کردید، بروید. | |
کد منبع را روی "Inline Editor" بگذارید و زمان اجرا را روی "Node.js 8" تنظیم کنید. |
در Cloud Shell خود دستور زیر را اجرا کنید. با این کار index.js و package.json در ویرایشگر پوسته ابری باز می شود
cloudshell edit index.js package.json
روی تب package.json کلیک کنید، آن کد را کپی کرده و در قسمت package.json ویرایشگر درون خطی توابع ابری قرار دهید. | |
"Function to Execute" را روی triggerDag تنظیم کنید | |
روی تب index.js کلیک کنید، کد را کپی کنید و آن را در بخش index.js ویرایشگر درون خطی Cloud Functions قرار دهید. | |
|
در Cloud Shell خود، دستور زیر را اجرا کنید و <your-environment-name> را با نام محیط Composer خود و <your-composer-region> را با منطقه ای که محیط Composer شما در آن قرار دارد جایگزین کنید.
gcloud composer environments describe <your-environment-name> --location <your-composer-region>
به عنوان مثال، اگر محیط شما my-composer-environment
نام دارد و در us-central1
قرار دارد دستور شما به این صورت خواهد بود.
gcloud composer environments describe my-composer-environment --location us-central1
خروجی باید چیزی شبیه به این باشد:
config:
airflowUri: https://abc123efghi456k-tp.appspot.com
dagGcsPrefix: gs://narnia-north1-test-codelab-jklmno-bucket/dags
gkeCluster: projects/a-project/zones/narnia-north1-b/clusters/narnia-north1-test-codelab-jklmno-gke
nodeConfig:
diskSizeGb: 100
location: projects/a-project/zones/narnia-north1-b
machineType: projects/a-project/zones/narnia-north1-b/machineTypes/n1-standard-1
network: projects/a-project/global/networks/default
oauthScopes:
- https://www.googleapis.com/auth/cloud-platform
serviceAccount: 987665432-compute@developer.gserviceaccount.com
nodeCount: 3
softwareConfig:
imageVersion: composer-1.7.0-airflow-1.10.0
pythonVersion: '2'
createTime: '2019-05-29T09:41:27.919Z'
name: projects/a-project/locations/narnia-north1/environments/my-composer-environment
state: RUNNING
updateTime: '2019-05-29T09:56:29.969Z'
uuid: 123456-7890-9876-543-210123456
در آن خروجی، به دنبال متغیری به نام | |
روی پیوند کشویی «بیشتر» کلیک کنید، سپس نزدیکترین منطقه جغرافیایی به خود را انتخاب کنید | |
"امتحان مجدد در صورت شکست" را علامت بزنید | |
برای ایجاد Cloud Function خود روی "Create" کلیک کنید |
گام برداشتن از طریق کد
کدی که از index.js کپی کردید چیزی شبیه به این خواهد بود:
// [START composer_trigger]
'use strict';
const fetch = require('node-fetch');
const FormData = require('form-data');
/**
* Triggered from a message on a Cloud Storage bucket.
*
* IAP authorization based on:
* https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
* and
* https://cloud.google.com/iap/docs/authentication-howto
*
* @param {!Object} data The Cloud Functions event data.
* @returns {Promise}
*/
exports.triggerDag = async data => {
// Fill in your Composer environment information here.
// The project that holds your function
const PROJECT_ID = 'your-project-id';
// Navigate to your webserver's login page and get this from the URL
const CLIENT_ID = 'your-iap-client-id';
// This should be part of your webserver's URL:
// {tenant-project-id}.appspot.com
const WEBSERVER_ID = 'your-tenant-project-id';
// The name of the DAG you wish to trigger
const DAG_NAME = 'composer_sample_trigger_response_dag';
// Other constants
const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
const USER_AGENT = 'gcf-event-trigger';
const BODY = {conf: JSON.stringify(data)};
// Make the request
try {
const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);
return makeIapPostRequest(
WEBSERVER_URL,
BODY,
iap.idToken,
USER_AGENT,
iap.jwt
);
} catch (err) {
throw new Error(err);
}
};
/**
* @param {string} clientId The client id associated with the Composer webserver application.
* @param {string} projectId The id for the project containing the Cloud Function.
* @param {string} userAgent The user agent string which will be provided with the webserver request.
*/
const authorizeIap = async (clientId, projectId, userAgent) => {
const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
const JWT_HEADER = Buffer.from(
JSON.stringify({alg: 'RS256', typ: 'JWT'})
).toString('base64');
let jwt = '';
let jwtClaimset = '';
// Obtain an Oauth2 access token for the appspot service account
const res = await fetch(
`http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
{
headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
}
);
const tokenResponse = await res.json();
if (tokenResponse.error) {
return Promise.reject(tokenResponse.error);
}
const accessToken = tokenResponse.access_token;
const iat = Math.floor(new Date().getTime() / 1000);
const claims = {
iss: SERVICE_ACCOUNT,
aud: 'https://www.googleapis.com/oauth2/v4/token',
iat: iat,
exp: iat + 60,
target_audience: clientId,
};
jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
const toSign = [JWT_HEADER, jwtClaimset].join('.');
const blob = await fetch(
`https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
{
method: 'POST',
body: JSON.stringify({
bytesToSign: Buffer.from(toSign).toString('base64'),
}),
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${accessToken}`,
},
}
);
const blobJson = await blob.json();
if (blobJson.error) {
return Promise.reject(blobJson.error);
}
// Request service account signature on header and claimset
const jwtSignature = blobJson.signature;
jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
const form = new FormData();
form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
form.append('assertion', jwt);
const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
method: 'POST',
body: form,
});
const tokenJson = await token.json();
if (tokenJson.error) {
return Promise.reject(tokenJson.error);
}
return {
jwt: jwt,
idToken: tokenJson.id_token,
};
};
/**
* @param {string} url The url that the post request targets.
* @param {string} body The body of the post request.
* @param {string} idToken Bearer token used to authorize the iap request.
* @param {string} userAgent The user agent to identify the requester.
*/
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
const res = await fetch(url, {
method: 'POST',
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${idToken}`,
},
body: JSON.stringify(body),
});
if (!res.ok) {
const err = await res.text();
throw new Error(err);
}
};
// [END composer_trigger]
بیایید نگاهی بیندازیم که چه خبر است. سه تابع در اینجا وجود دارد: triggerDag
، authorizeIap
، و makeIapPostRequest
triggerDag
تابعی است که وقتی چیزی را در سطل ذخیره سازی ابری تعیین شده آپلود می کنیم، راه اندازی می شود. اینجاست که ما متغیرهای مهم مورد استفاده در سایر درخواستها را پیکربندی میکنیم، مانند PROJECT_ID
، CLIENT_ID
، WEBSERVER_ID
، و DAG_NAME
. authorizeIap
و makeIapPostRequest
را فراخوانی می کند.
exports.triggerDag = async data => {
// Fill in your Composer environment information here.
// The project that holds your function
const PROJECT_ID = 'your-project-id';
// Navigate to your webserver's login page and get this from the URL
const CLIENT_ID = 'your-iap-client-id';
// This should be part of your webserver's URL:
// {tenant-project-id}.appspot.com
const WEBSERVER_ID = 'your-tenant-project-id';
// The name of the DAG you wish to trigger
const DAG_NAME = 'composer_sample_trigger_response_dag';
// Other constants
const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
const USER_AGENT = 'gcf-event-trigger';
const BODY = {conf: JSON.stringify(data)};
// Make the request
try {
const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);
return makeIapPostRequest(
WEBSERVER_URL,
BODY,
iap.idToken,
USER_AGENT,
iap.jwt
);
} catch (err) {
throw new Error(err);
}
};
authorizeIap
با استفاده از یک حساب سرویس و "مبادله" JWT برای شناسه شناسه ای که برای احراز هویت makeIapPostRequest
استفاده می شود، درخواستی از پروکسی می کند که از وب سرور Airflow محافظت می کند.
const authorizeIap = async (clientId, projectId, userAgent) => {
const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
const JWT_HEADER = Buffer.from(
JSON.stringify({alg: 'RS256', typ: 'JWT'})
).toString('base64');
let jwt = '';
let jwtClaimset = '';
// Obtain an Oauth2 access token for the appspot service account
const res = await fetch(
`http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
{
headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
}
);
const tokenResponse = await res.json();
if (tokenResponse.error) {
return Promise.reject(tokenResponse.error);
}
const accessToken = tokenResponse.access_token;
const iat = Math.floor(new Date().getTime() / 1000);
const claims = {
iss: SERVICE_ACCOUNT,
aud: 'https://www.googleapis.com/oauth2/v4/token',
iat: iat,
exp: iat + 60,
target_audience: clientId,
};
jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
const toSign = [JWT_HEADER, jwtClaimset].join('.');
const blob = await fetch(
`https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
{
method: 'POST',
body: JSON.stringify({
bytesToSign: Buffer.from(toSign).toString('base64'),
}),
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${accessToken}`,
},
}
);
const blobJson = await blob.json();
if (blobJson.error) {
return Promise.reject(blobJson.error);
}
// Request service account signature on header and claimset
const jwtSignature = blobJson.signature;
jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
const form = new FormData();
form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
form.append('assertion', jwt);
const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
method: 'POST',
body: form,
});
const tokenJson = await token.json();
if (tokenJson.error) {
return Promise.reject(tokenJson.error);
}
return {
jwt: jwt,
idToken: tokenJson.id_token,
};
};
makeIapPostRequest
با وب سرور Airflow تماس می گیرد تا composer_sample_trigger_response_dag.
نام DAG در URL وب سرور Airflow که با پارامتر url
ارسال شده است، جاسازی شده است، و idToken
نشانه ای است که در درخواست authorizeIap
به دست آوردیم.
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
const res = await fetch(url, {
method: 'POST',
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${idToken}`,
},
body: JSON.stringify(body),
});
if (!res.ok) {
const err = await res.text();
throw new Error(err);
}
};
5. DAG خود را راه اندازی کنید
در Cloud Shell خود، به دایرکتوری با نمونه گردش کار تغییر دهید. این بخشی از نمونههای python-docs است که از GitHub در مرحله Getting the Client Id دانلود کردهاید.
cd cd python-docs-samples/composer/workflows
DAG را در Composer آپلود کنید
DAG نمونه را با دستور زیر در سطل ذخیره سازی DAG محیط Composer خود آپلود کنید، جایی که <environment_name>
نام محیط Composer شما و <location>
نام منطقه ای است که در آن قرار دارد. trigger_response_dag.py
DAG است که ما با آن کار خواهیم کرد.
gcloud composer environments storage dags import \ --environment <environment_name> \ --location <location> \ --source trigger_response_dag.py
برای مثال، اگر محیط Composer شما my-composer
نام داشت و در us-central1
قرار داشت، دستور شما به این صورت خواهد بود.
gcloud composer environments storage dags import \ --environment my-composer \ --location us-central1 \ --source trigger_response_dag.py
قدم گذاشتن از طریق DAG
کد DAG در trigger_response.py
شبیه این است
import datetime
import airflow
from airflow.operators import bash_operator
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': datetime.datetime(2017, 1, 1),
}
with airflow.DAG(
'composer_sample_trigger_response_dag',
default_args=default_args,
# Not scheduled, trigger only
schedule_interval=None) as dag:
# Print the dag_run's configuration, which includes information about the
# Cloud Storage object change.
print_gcs_info = bash_operator.BashOperator(
task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')
بخش default_args
حاوی آرگومانهای پیشفرض مطابق با مدل BaseOperator در Apache Airflow است. این بخش را با این پارامترها در هر Apache Airflow DAG مشاهده خواهید کرد. owner
در حال حاضر روی Composer Example
تنظیم شده است، اما در صورت تمایل می توانید نام خود را تغییر دهید. depends_on_past
به ما نشان می دهد که این DAG به هیچ DAG قبلی وابسته نیست. سه بخش ایمیل، email
، email_on_failure
، و email_on_retry
به گونه ای تنظیم شده اند که هیچ اعلان ایمیلی بر اساس وضعیت این DAG وارد نشود. DAG فقط یک بار امتحان می کند، زیرا retries
روی 1 تنظیم شده است، و این کار را پس از پنج دقیقه، در هر retry_delay
انجام می دهد. start_date
معمولاً دیکته میکند که یک DAG چه زمانی باید اجرا شود، در رابطه با schedule_interval
آن (تعداد بعدی) اما در مورد این DAG، مربوط نیست. برای تاریخ 1 ژانویه 2017 تنظیم شده است، اما می تواند در هر تاریخ گذشته تنظیم شود.
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': datetime.datetime(2017, 1, 1),
}
بخش with airflow.DAG
DAG را که اجرا خواهد شد پیکربندی می کند. با شناسه وظیفه composer_sample_trigger_response_dag
، آرگومانهای پیشفرض از بخش default_args
، و مهمتر از همه، با یک schedule_interval
None
اجرا میشود. schedule_interval
روی None
تنظیم شده است زیرا ما این DAG خاص را با عملکرد Cloud خود راه اندازی می کنیم. به همین دلیل است که start_date
در default_args
مرتبط نیست.
هنگامی که اجرا می شود، DAG پیکربندی خود را همانطور که در متغیر print_gcs_info
دیکته شده است، چاپ می کند.
with airflow.DAG(
'composer_sample_trigger_response_dag',
default_args=default_args,
# Not scheduled, trigger only
schedule_interval=None) as dag:
# Print the dag_run's configuration, which includes information about the
# Cloud Storage object change.
print_gcs_info = bash_operator.BashOperator(
task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')
6. عملکرد خود را آزمایش کنید
Composer Environment خود را باز کنید و در ردیف با نام محیط خود، روی پیوند Airflow کلیک کنید | |
| |
یک برگه جداگانه باز کنید و هر فایلی را در سطل Cloud Storage که قبلا ایجاد کرده اید و به عنوان محرک عملکرد Cloud خود مشخص کرده اید، آپلود کنید. می توانید این کار را از طریق کنسول یا با استفاده از دستور gsutil انجام دهید. | |
با رابط کاربری Airflow خود به برگه برگردید و روی Graph View کلیک کنید | |
روی کار | |
روی "View Log" در سمت راست بالای منو کلیک کنید | |
در گزارشها، اطلاعاتی درباره فایلی که در سطل فضای ذخیرهسازی ابری خود آپلود کردهاید، مشاهده خواهید کرد. |
تبریک می گویم! شما به تازگی یک Airflow DAG را با استفاده از Node.js و Google Cloud Functions راه اندازی کرده اید!
7. پاکسازی
برای جلوگیری از تحمیل هزینه به حساب GCP خود برای منابع استفاده شده در این شروع سریع:
- (اختیاری) برای ذخیره داده های خود، داده ها را از سطل ذخیره سازی Cloud برای محیط Cloud Composer و سطل ذخیره سازی که برای این شروع سریع ایجاد کرده اید دانلود کنید .
- سطل Cloud Storage را برای محیطی که ایجاد کرده اید حذف کنید
- محیط Cloud Composer را حذف کنید . توجه داشته باشید که حذف محیط شما، سطل ذخیره سازی محیط را حذف نمی کند.
- (اختیاری) با محاسبات بدون سرور، 2 میلیون فراخوان اول در ماه رایگان است، و هنگامی که عملکرد خود را صفر کنید، هزینه ای از شما دریافت نمی شود (برای جزئیات بیشتر به قیمت مراجعه کنید). با این حال، اگر میخواهید عملکرد Cloud خود را حذف کنید، این کار را با کلیک کردن روی «DELETE» در سمت راست بالای صفحه نمای کلی برای عملکرد خود انجام دهید.
همچنین می توانید به صورت اختیاری پروژه را حذف کنید:
- در کنسول GCP، به صفحه پروژه ها بروید.
- در لیست پروژه، پروژه ای را که می خواهید حذف کنید انتخاب کنید و روی Delete کلیک کنید.
- در کادر، ID پروژه را تایپ کنید و سپس بر روی Shut down کلیک کنید تا پروژه حذف شود.