1. Введение
Apache Airflow предназначен для запуска групп DAG по регулярному расписанию, но вы также можете запускать группы DAG в ответ на такие события, как изменение в сегменте Cloud Storage или сообщение, отправленное в Cloud Pub/Sub. Для этого группы DAG Cloud Composer могут запускаться с помощью Cloud Functions .
В примере этого лабораторного занятия простая группа обеспечения доступности баз данных запускается каждый раз, когда происходит изменение в сегменте Cloud Storage. Эта группа обеспечения доступности баз данных использует BashOperator для запуска команды bash, печатающей информацию об изменении того, что было загружено в корзину Cloud Storage.
Прежде чем приступить к выполнению лабораторной работы, рекомендуется выполнить лабораторные работы по написанию кода «Введение в Cloud Composer» и «Начало работы с облачными функциями» . Если вы создадите среду Composer в лабораторной работе «Введение в Cloud Composer», вы сможете использовать эту среду в этой лабораторной работе.
Что вы построите
В этой кодовой лаборатории вы:
- Загрузите файл в облачное хранилище Google , которое будет
- Запуск функции Google Cloud с использованием среды выполнения Node.JS
- Эта функция выполнит группу обеспечения доступности баз данных в Google Cloud Composer.
- При этом выполняется простая команда bash, печатающая изменения в корзине Google Cloud Storage.
Что вы узнаете
- Как запустить Apache Airflow DAG с помощью Google Cloud Functions + Node.js
Что вам понадобится
- Аккаунт GCP
- Базовое понимание Javascript
- Базовые знания Cloud Composer/Airflow и облачных функций.
- Комфортное использование команд CLI
2. Настройка GCP
Выберите или создайте проект
Выберите или создайте проект Google Cloud Platform. Если вы создаете новый проект, выполните действия, описанные здесь .
Запишите свой идентификатор проекта, который вы будете использовать на последующих этапах.
Если вы создаете новый проект, идентификатор проекта находится прямо под именем проекта на странице создания. | |
Если вы уже создали проект, вы можете найти его идентификатор на главной странице консоли в карточке «Информация о проекте». |
Включите API
Создать среду композитора
Создайте среду Cloud Composer со следующей конфигурацией:
Все остальные конфигурации могут оставаться по умолчанию. Нажмите «Создать» внизу. Запишите имя и местоположение вашей среды Composer — они понадобятся вам в будущих шагах. |
Создать сегмент облачного хранилища
В своем проекте создайте корзину Cloud Storage со следующей конфигурацией:
Когда будете готовы, нажмите «Создать». Обязательно запишите имя своего сегмента облачного хранилища для дальнейших действий. |
3. Настройка облачных функций Google (GCF)
Чтобы настроить GCF, мы будем запускать команды в Google Cloud Shell.
Хотя Google Cloud можно управлять удаленно с вашего ноутбука с помощью инструмента командной строки gcloud , в этой лаборатории мы будем использовать Google Cloud Shell , среду командной строки, работающую в облаке.
Эта виртуальная машина на базе Debian оснащена всеми необходимыми инструментами разработки. Он предлагает постоянный домашний каталог объемом 5 ГБ и работает в облаке Google, что значительно повышает производительность сети и аутентификацию. Это означает, что все, что вам понадобится для этой лаборатории кода, — это браузер (да, он работает на Chromebook).
Чтобы активировать Google Cloud Shell, в консоли разработчика нажмите кнопку в правом верхнем углу (подготовка и подключение к среде займет всего несколько минут): |
Предоставьте разрешения на подпись BLOB-объектов учетной записи службы облачных функций.
Чтобы GCF мог аутентифицироваться в Cloud IAP , прокси-сервере, который защищает веб-сервер Airflow, вам необходимо предоставить учетной записи службы Appspot GCF роль Service Account Token Creator
. Для этого выполните следующую команду в Cloud Shell, заменив <your-project-id>
именем вашего проекта.
gcloud iam service-accounts add-iam-policy-binding \ <your-project-id>@appspot.gserviceaccount.com \ --member=serviceAccount:<your-project-id>@appspot.gserviceaccount.com \ --role=roles/iam.serviceAccountTokenCreator
Например, если ваш проект называется my-project
, ваша команда будет такой:
gcloud iam service-accounts add-iam-policy-binding \ my-project@appspot.gserviceaccount.com \ --member=serviceAccount:my-project@appspot.gserviceaccount.com \ --role=roles/iam.serviceAccountTokenCreator
Получение идентификатора клиента
Чтобы создать токен для аутентификации в Cloud IAP , функции требуется идентификатор клиента прокси-сервера, который защищает веб-сервер Airflow. API Cloud Composer не предоставляет эту информацию напрямую. Вместо этого сделайте неаутентифицированный запрос к веб-серверу Airflow и получите идентификатор клиента из URL-адреса перенаправления. Мы собираемся сделать это, запустив файл Python с помощью Cloud Shell для получения идентификатора клиента.
Загрузите необходимый код с GitHub, выполнив следующую команду в Cloud Shell.
cd git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
Если вы получили сообщение об ошибке, поскольку этот каталог уже существует, обновите его до последней версии, выполнив следующую команду
cd python-docs-samples/ git pull origin master
Перейдите в соответствующий каталог, запустив
cd python-docs-samples/composer/rest
Запустите код Python, чтобы получить свой идентификатор клиента, заменив имя вашего проекта на <your-project-id>
, местоположение среды Composer, которую вы создали ранее, на <your-composer-location>
и имя среды Composer, которую вы создали. созданный ранее для <your-composer-environment>
python3 get_client_id.py <your-project-id> <your-composer-location> <your-composer-environment>
Например, если имя вашего проекта — my-project
, расположение вашего Composer — us-central1
, а имя вашей среды — my-composer
, ваша команда будет такой:
python3 get_client_id.py my-project us-central1 my-composer
get_client_id.py
делает следующее:
- Аутентификация с помощью Google Cloud
- Выполняет неаутентифицированный HTTP-запрос к веб-серверу Airflow, чтобы получить URI перенаправления.
- Извлекает параметр запроса
client_id
из этого перенаправления. - Распечатывает его для использования
Ваш идентификатор клиента будет распечатан в командной строке и будет выглядеть примерно так:
12345678987654321-abc1def3ghi5jkl7mno8pqr0.apps.googleusercontent.com
4. Создайте свою функцию
В Cloud Shell клонируйте репозиторий с необходимым примером кода, запустив
cd git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git
Перейдите в нужный каталог и оставьте Cloud Shell открытым, пока выполните следующие несколько шагов.
cd nodejs-docs-samples/composer/functions/composer-storage-trigger
Перейдите на страницу функций Google Cloud, щелкнув меню навигации, а затем нажав «Функции облака». | |
Нажмите «СОЗДАТЬ ФУНКЦИЮ» вверху страницы. | |
Назовите свою функцию «my-function» и оставьте для памяти значение по умолчанию — 256 МБ. | |
Установите триггер на «Облачное хранилище», оставьте тип события «Завершить/Создать» и перейдите к сегменту, созданному на этапе «Создание сегмента облачного хранилища». | |
Оставьте для исходного кода значение «Встроенный редактор» и установите среду выполнения «Node.js 8». |
В Cloud Shell выполните следующую команду. Это откроет index.js и package.json в редакторе Cloud Shell.
cloudshell edit index.js package.json
Перейдите на вкладку package.json, скопируйте этот код и вставьте его в раздел package.json встроенного редактора Cloud Functions. | |
Установите для «Функции для выполнения» значение триггерDag. | |
Перейдите на вкладку index.js, скопируйте код и вставьте его в раздел index.js встроенного редактора облачных функций. | |
Измените |
В Cloud Shell выполните следующую команду, заменив <your-environment-name> именем вашей среды Composer и <your-composer-region> регионом, в котором расположена ваша среда Composer.
gcloud composer environments describe <your-environment-name> --location <your-composer-region>
Например, если ваша среда называется my-composer-environment
и расположена в us-central1
ваша команда будет такой:
gcloud composer environments describe my-composer-environment --location us-central1
Вывод должен выглядеть примерно так:
config:
airflowUri: https://abc123efghi456k-tp.appspot.com
dagGcsPrefix: gs://narnia-north1-test-codelab-jklmno-bucket/dags
gkeCluster: projects/a-project/zones/narnia-north1-b/clusters/narnia-north1-test-codelab-jklmno-gke
nodeConfig:
diskSizeGb: 100
location: projects/a-project/zones/narnia-north1-b
machineType: projects/a-project/zones/narnia-north1-b/machineTypes/n1-standard-1
network: projects/a-project/global/networks/default
oauthScopes:
- https://www.googleapis.com/auth/cloud-platform
serviceAccount: 987665432-compute@developer.gserviceaccount.com
nodeCount: 3
softwareConfig:
imageVersion: composer-1.7.0-airflow-1.10.0
pythonVersion: '2'
createTime: '2019-05-29T09:41:27.919Z'
name: projects/a-project/locations/narnia-north1/environments/my-composer-environment
state: RUNNING
updateTime: '2019-05-29T09:56:29.969Z'
uuid: 123456-7890-9876-543-210123456
В этом выводе найдите переменную с именем | |
Нажмите раскрывающуюся ссылку «Дополнительно», затем выберите географически ближайший к вам регион. | |
Установите флажок «Повторить попытку в случае неудачи». | |
Нажмите «Создать», чтобы создать свою облачную функцию. |
Шаги по коду
Код, который вы скопировали из index.js, будет выглядеть примерно так:
// [START composer_trigger]
'use strict';
const fetch = require('node-fetch');
const FormData = require('form-data');
/**
* Triggered from a message on a Cloud Storage bucket.
*
* IAP authorization based on:
* https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
* and
* https://cloud.google.com/iap/docs/authentication-howto
*
* @param {!Object} data The Cloud Functions event data.
* @returns {Promise}
*/
exports.triggerDag = async data => {
// Fill in your Composer environment information here.
// The project that holds your function
const PROJECT_ID = 'your-project-id';
// Navigate to your webserver's login page and get this from the URL
const CLIENT_ID = 'your-iap-client-id';
// This should be part of your webserver's URL:
// {tenant-project-id}.appspot.com
const WEBSERVER_ID = 'your-tenant-project-id';
// The name of the DAG you wish to trigger
const DAG_NAME = 'composer_sample_trigger_response_dag';
// Other constants
const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
const USER_AGENT = 'gcf-event-trigger';
const BODY = {conf: JSON.stringify(data)};
// Make the request
try {
const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);
return makeIapPostRequest(
WEBSERVER_URL,
BODY,
iap.idToken,
USER_AGENT,
iap.jwt
);
} catch (err) {
throw new Error(err);
}
};
/**
* @param {string} clientId The client id associated with the Composer webserver application.
* @param {string} projectId The id for the project containing the Cloud Function.
* @param {string} userAgent The user agent string which will be provided with the webserver request.
*/
const authorizeIap = async (clientId, projectId, userAgent) => {
const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
const JWT_HEADER = Buffer.from(
JSON.stringify({alg: 'RS256', typ: 'JWT'})
).toString('base64');
let jwt = '';
let jwtClaimset = '';
// Obtain an Oauth2 access token for the appspot service account
const res = await fetch(
`http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
{
headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
}
);
const tokenResponse = await res.json();
if (tokenResponse.error) {
return Promise.reject(tokenResponse.error);
}
const accessToken = tokenResponse.access_token;
const iat = Math.floor(new Date().getTime() / 1000);
const claims = {
iss: SERVICE_ACCOUNT,
aud: 'https://www.googleapis.com/oauth2/v4/token',
iat: iat,
exp: iat + 60,
target_audience: clientId,
};
jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
const toSign = [JWT_HEADER, jwtClaimset].join('.');
const blob = await fetch(
`https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
{
method: 'POST',
body: JSON.stringify({
bytesToSign: Buffer.from(toSign).toString('base64'),
}),
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${accessToken}`,
},
}
);
const blobJson = await blob.json();
if (blobJson.error) {
return Promise.reject(blobJson.error);
}
// Request service account signature on header and claimset
const jwtSignature = blobJson.signature;
jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
const form = new FormData();
form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
form.append('assertion', jwt);
const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
method: 'POST',
body: form,
});
const tokenJson = await token.json();
if (tokenJson.error) {
return Promise.reject(tokenJson.error);
}
return {
jwt: jwt,
idToken: tokenJson.id_token,
};
};
/**
* @param {string} url The url that the post request targets.
* @param {string} body The body of the post request.
* @param {string} idToken Bearer token used to authorize the iap request.
* @param {string} userAgent The user agent to identify the requester.
*/
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
const res = await fetch(url, {
method: 'POST',
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${idToken}`,
},
body: JSON.stringify(body),
});
if (!res.ok) {
const err = await res.text();
throw new Error(err);
}
};
// [END composer_trigger]
Давайте посмотрим, что происходит. Здесь есть три функции: triggerDag
, authorizeIap
и makeIapPostRequest
triggerDag
— это функция, которая срабатывает, когда мы загружаем что-то в назначенную корзину Cloud Storage. Здесь мы настраиваем важные переменные, используемые в других запросах, такие как PROJECT_ID
, CLIENT_ID
, WEBSERVER_ID
и DAG_NAME
. Он вызывает authorizeIap
и makeIapPostRequest
.
exports.triggerDag = async data => {
// Fill in your Composer environment information here.
// The project that holds your function
const PROJECT_ID = 'your-project-id';
// Navigate to your webserver's login page and get this from the URL
const CLIENT_ID = 'your-iap-client-id';
// This should be part of your webserver's URL:
// {tenant-project-id}.appspot.com
const WEBSERVER_ID = 'your-tenant-project-id';
// The name of the DAG you wish to trigger
const DAG_NAME = 'composer_sample_trigger_response_dag';
// Other constants
const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
const USER_AGENT = 'gcf-event-trigger';
const BODY = {conf: JSON.stringify(data)};
// Make the request
try {
const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);
return makeIapPostRequest(
WEBSERVER_URL,
BODY,
iap.idToken,
USER_AGENT,
iap.jwt
);
} catch (err) {
throw new Error(err);
}
};
authorizeIap
отправляет запрос к прокси-серверу, который защищает веб-сервер Airflow, используя учетную запись службы и «обменивая» JWT на идентификационный токен, который будет использоваться для аутентификации makeIapPostRequest
.
const authorizeIap = async (clientId, projectId, userAgent) => {
const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
const JWT_HEADER = Buffer.from(
JSON.stringify({alg: 'RS256', typ: 'JWT'})
).toString('base64');
let jwt = '';
let jwtClaimset = '';
// Obtain an Oauth2 access token for the appspot service account
const res = await fetch(
`http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
{
headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
}
);
const tokenResponse = await res.json();
if (tokenResponse.error) {
return Promise.reject(tokenResponse.error);
}
const accessToken = tokenResponse.access_token;
const iat = Math.floor(new Date().getTime() / 1000);
const claims = {
iss: SERVICE_ACCOUNT,
aud: 'https://www.googleapis.com/oauth2/v4/token',
iat: iat,
exp: iat + 60,
target_audience: clientId,
};
jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
const toSign = [JWT_HEADER, jwtClaimset].join('.');
const blob = await fetch(
`https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
{
method: 'POST',
body: JSON.stringify({
bytesToSign: Buffer.from(toSign).toString('base64'),
}),
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${accessToken}`,
},
}
);
const blobJson = await blob.json();
if (blobJson.error) {
return Promise.reject(blobJson.error);
}
// Request service account signature on header and claimset
const jwtSignature = blobJson.signature;
jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
const form = new FormData();
form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
form.append('assertion', jwt);
const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
method: 'POST',
body: form,
});
const tokenJson = await token.json();
if (tokenJson.error) {
return Promise.reject(tokenJson.error);
}
return {
jwt: jwt,
idToken: tokenJson.id_token,
};
};
makeIapPostRequest
вызывает веб-сервер Airflow для запуска composer_sample_trigger_response_dag.
Имя группы обеспечения доступности баз данных встроено в URL-адрес веб-сервера Airflow, передаваемый с параметром url
, а idToken
— это токен, который мы получили в запросе authorizeIap
.
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
const res = await fetch(url, {
method: 'POST',
headers: {
'User-Agent': userAgent,
Authorization: `Bearer ${idToken}`,
},
body: JSON.stringify(body),
});
if (!res.ok) {
const err = await res.text();
throw new Error(err);
}
};
5. Настройте DAG
В Cloud Shell перейдите в каталог с примерами рабочих процессов. Это часть образцов python-docs, которые вы скачали с GitHub на этапе получения идентификатора клиента.
cd cd python-docs-samples/composer/workflows
Загрузите DAG в Composer
Загрузите образец DAG в корзину хранилища DAG вашей среды Composer с помощью следующей команды, где <environment_name>
— это имя вашей среды Composer, а <location>
— имя региона, в котором он расположен. trigger_response_dag.py
— это группа обеспечения доступности баз данных, с которой мы будем работать.
gcloud composer environments storage dags import \ --environment <environment_name> \ --location <location> \ --source trigger_response_dag.py
Например, если ваша среда Composer называлась my-composer
и располагалась в us-central1
, ваша команда будет такой:
gcloud composer environments storage dags import \ --environment my-composer \ --location us-central1 \ --source trigger_response_dag.py
Шаг через DAG
Код DAG в trigger_response.py
выглядит следующим образом.
import datetime
import airflow
from airflow.operators import bash_operator
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': datetime.datetime(2017, 1, 1),
}
with airflow.DAG(
'composer_sample_trigger_response_dag',
default_args=default_args,
# Not scheduled, trigger only
schedule_interval=None) as dag:
# Print the dag_run's configuration, which includes information about the
# Cloud Storage object change.
print_gcs_info = bash_operator.BashOperator(
task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')
Раздел default_args
содержит аргументы по умолчанию, необходимые для модели BaseOperator в Apache Airflow. Этот раздел с этими параметрами можно увидеть в любой группе обеспечения доступности баз данных Apache Airflow. В owner
в настоящее время установлено Composer Example
, но при желании вы можете изменить это имя на свое. depends_on_past
показывает нам, что эта группа обеспечения доступности баз данных не зависит от каких-либо предыдущих групп обеспечения доступности баз данных. Три раздела электронной почты: email
, email_on_failure
и email_on_retry
настроены так, что уведомления по электронной почте не приходят в зависимости от состояния этой группы обеспечения доступности баз данных. Группа обеспечения доступности баз данных будет повторять попытку только один раз, поскольку для retries
установлено значение 1, и сделает это через пять минут для каждого retry_delay
. start_date
обычно определяет, когда должна запускаться группа обеспечения доступности баз данных, в сочетании с ее schedule_interval
(устанавливается позже), но в случае этой группы обеспечения доступности баз данных это не имеет значения. Он установлен на 1 января 2017 года, но может быть установлен на любую прошлую дату.
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': datetime.datetime(2017, 1, 1),
}
Раздел with airflow.DAG
настраивает группу обеспечения доступности баз данных, которая будет запущена. Он будет запущен с идентификатором задачи composer_sample_trigger_response_dag
, аргументами по умолчанию из раздела default_args
и, что наиболее важно, с schedule_interval
, равным None
. Для schedule_interval
установлено значение None
поскольку мы запускаем этот конкретный DAG с помощью нашей облачной функции. Вот почему start_date
в default_args
не имеет значения.
При выполнении группа обеспечения доступности баз данных печатает свою конфигурацию, как указано в переменной print_gcs_info
.
with airflow.DAG(
'composer_sample_trigger_response_dag',
default_args=default_args,
# Not scheduled, trigger only
schedule_interval=None) as dag:
# Print the dag_run's configuration, which includes information about the
# Cloud Storage object change.
print_gcs_info = bash_operator.BashOperator(
task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')
6. Проверьте свою функцию
Откройте среду Composer и в строке с именем вашей среды щелкните ссылку Airflow. | |
Откройте | |
Откройте отдельную вкладку и загрузите любой файл в корзину Cloud Storage, которую вы создали ранее и указали в качестве триггера для вашей облачной функции. Вы можете сделать это через консоль или с помощью команды gsutil . | |
Вернитесь на вкладку с пользовательским интерфейсом Airflow и нажмите «Просмотр графика». | |
Нажмите на задачу | |
Нажмите «Просмотреть журнал» в правом верхнем углу меню. | |
В журналах вы увидите информацию о файле, который вы загрузили в корзину Cloud Storage. |
Поздравляем! Вы только что активировали группу обеспечения доступности баз данных Airflow, используя Node.js и функции Google Cloud!
7. Очистка
Чтобы избежать списания средств с вашей учетной записи GCP за ресурсы, используемые в этом кратком руководстве:
- (Необязательно) Чтобы сохранить данные, загрузите их из сегмента Cloud Storage для среды Cloud Composer и сегмента хранилища, созданного вами для этого краткого руководства.
- Удалите сегмент Cloud Storage для среды и созданный вами.
- Удалите среду Cloud Composer . Обратите внимание, что удаление вашей среды не приводит к удалению сегмента хранилища для этой среды.
- (Необязательно) При использовании бессерверных вычислений первые 2 миллиона вызовов в месяц бесплатны, а когда вы масштабируете функцию до нуля, с вас не взимается плата (более подробную информацию см. в разделе цены ). Однако, если вы хотите удалить свою облачную функцию, сделайте это, нажав «УДАЛИТЬ» в правом верхнем углу страницы обзора вашей функции.
Вы также можете при желании удалить проект:
- В консоли GCP перейдите на страницу «Проекты» .
- В списке проектов выберите проект, который хотите удалить, и нажмите «Удалить».
- В поле введите идентификатор проекта и нажмите «Завершить работу» , чтобы удалить проект.