Запуск задания подсчета слов Hadoop в кластере Dataproc

1. Введение

Рабочие процессы являются распространенным вариантом использования в аналитике данных: они включают в себя прием, преобразование и анализ данных для поиска в них значимой информации. В Google Cloud Platform инструментом для организации рабочих процессов является Cloud Composer, который представляет собой размещенную версию популярного инструмента рабочих процессов с открытым исходным кодом Apache Airflow. В ходе этой лабораторной работы вы будете использовать Cloud Composer для создания простого рабочего процесса, который создает кластер Cloud Dataproc, анализирует его с помощью Cloud Dataproc и Apache Hadoop, а затем удаляет кластер Cloud Dataproc.

Что такое Cloud Composer?

Cloud Composer — это полностью управляемая служба оркестрации рабочих процессов, которая позволяет вам создавать, планировать и отслеживать конвейеры, охватывающие облака и локальные центры обработки данных. Cloud Composer создан на основе популярного проекта с открытым исходным кодом Apache Airflow и работает с использованием языка программирования Python. Он не требует блокировки и прост в использовании.

Используя Cloud Composer вместо локального экземпляра Apache Airflow, пользователи могут воспользоваться всеми преимуществами Airflow без затрат на установку или управление.

Что такое Apache Airflow?

Apache Airflow — это инструмент с открытым исходным кодом, используемый для программного создания, планирования и мониторинга рабочих процессов. Следует запомнить несколько ключевых терминов, касающихся Airflow, которые вы будете встречать на протяжении всей лабораторной работы:

  • DAG — DAG (направленный ациклический граф) — это набор организованных задач, которые вы хотите запланировать и запустить. Группы обеспечения доступности баз данных, также называемые рабочими процессами, определяются в стандартных файлах Python.
  • Оператор — оператор описывает одну задачу в рабочем процессе.

Что такое Cloud Dataproc?

Cloud Dataproc — это полностью управляемый сервис Apache Spark и Apache Hadoop от Google Cloud Platform. Cloud Dataproc легко интегрируется с другими сервисами GCP, предоставляя вам мощную и полноценную платформу для обработки данных, аналитики и машинного обучения.

Что ты будешь делать

В этой лаборатории кода показано, как создать и запустить рабочий процесс Apache Airflow в Cloud Composer, который выполняет следующие задачи:

  • Создает кластер Cloud Dataproc.
  • Запускает задание подсчета слов Apache Hadoop в кластере и выводит его результаты в облачное хранилище.
  • Удаляет кластер

Что вы узнаете

  • Как создать и запустить рабочий процесс Apache Airflow в Cloud Composer
  • Как использовать Cloud Composer и Cloud Dataproc для анализа набора данных
  • Как получить доступ к среде Cloud Composer через консоль Google Cloud Platform, Cloud SDK и веб-интерфейс Airflow

Что вам понадобится

  • аккаунт GCP
  • Базовые знания CLI
  • Базовое понимание Python

2. Настройка GCP

Создать проект

Выберите или создайте проект Google Cloud Platform.

Запишите свой идентификатор проекта, который вы будете использовать на последующих этапах.

Если вы создаете новый проект, идентификатор проекта находится прямо под именем проекта на странице создания.

Если вы уже создали проект, вы можете найти его идентификатор на главной странице консоли в карточке «Информация о проекте».

Включите API

Включите API Cloud Composer, Cloud Dataproc и Cloud Storage . Как только они будут включены, вы можете игнорировать кнопку с надписью «Перейти к учетным данным» и перейти к следующему шагу руководства.

Создать среду композитора

Создайте среду Cloud Composer со следующей конфигурацией:

  • Имя: моя-композитор-среда
  • Местоположение: us-central1
  • Зона: us-central1-a

Все остальные конфигурации могут оставаться по умолчанию. Нажмите «Создать» внизу.

Создать сегмент облачного хранилища

В своем проекте создайте корзину Cloud Storage со следующей конфигурацией:

  • Имя: <идентификатор вашего проекта>
  • Класс хранилища по умолчанию: Многорегиональный.
  • Местоположение: США
  • Модель контроля доступа: мелкозернистая

Нажмите «Создать», когда будете готовы.

3. Настройка Apache Airflow

Просмотр информации о среде Composer

В консоли GCP откройте страницу «Среды» .

Щелкните имя среды, чтобы просмотреть ее сведения.

На странице сведений о среде представлена ​​такая информация, как URL-адрес веб-интерфейса Airflow, идентификатор кластера Google Kubernetes Engine, имя сегмента Cloud Storage и путь к папке /dags.

В Airflow DAG (направленный ациклический график) — это набор организованных задач, которые вы хотите запланировать и запустить. Группы обеспечения доступности баз данных, также называемые рабочими процессами, определяются в стандартных файлах Python. Cloud Composer планирует группы обеспечения доступности баз данных только в папке /dags. Папка /dags находится в сегменте Cloud Storage, который Cloud Composer создает автоматически при создании среды.

Настройка переменных среды Apache Airflow

Переменные Apache Airflow — это концепция, специфичная для Airflow, которая отличается от переменных среды . На этом этапе вы установите следующие три переменные Airflow : gcp_project , gcs_bucket и gce_zone .

Использование gcloud для установки переменных

Сначала откройте Cloud Shell , в котором для вас удобно установлен Cloud SDK.

Установите для переменной среды COMPOSER_INSTANCE имя вашей среды Composer.

COMPOSER_INSTANCE=my-composer-environment

Чтобы установить переменные Airflow с помощью инструмента командной строки gcloud, используйте команду gcloud composer environments run с подкомандой variables . Эта команда gcloud composer выполняет variables подкоманды Airflow CLI. Подкоманда передает аргументы инструменту командной строки gcloud .

Вы запустите эту команду три раза, заменяя переменные теми, которые относятся к вашему проекту.

Установите gcp_project с помощью следующей команды, заменив <your-project-id> идентификатором проекта, который вы записали на шаге 2.

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gcp_project <your-project-id>

Ваш результат будет выглядеть примерно так

kubeconfig entry generated for us-central1-my-composer-env-123abc-gke.
Executing within the following Kubernetes cluster namespace: composer-1-10-0-airflow-1-10-2-123abc
[2020-04-17 20:42:49,713] {settings.py:176} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=449
[2020-04-17 20:42:50,123] {default_celery.py:90} WARNING - You have configured a result_backend of redis://airflow-redis-service.default.svc.cluste
r.local:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database).
[2020-04-17 20:42:50,127] {__init__.py:51} INFO - Using executor CeleryExecutor
[2020-04-17 20:42:50,433] {app.py:52} WARNING - Using default Composer Environment Variables. Overrides have not been applied.
[2020-04-17 20:42:50,440] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg
[2020-04-17 20:42:50,452] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg

Задайте gcs_bucket с помощью следующей команды, заменив <your-bucket-name> идентификатором корзины, который вы записали на шаге 2. Если вы последовали нашей рекомендации, имя вашего корзины будет таким же, как идентификатор вашего проекта. Вывод будет аналогичен предыдущей команде.

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gcs_bucket gs://<your-bucket-name>

Установите gce_zone с помощью следующей команды. Ваш вывод будет аналогичен предыдущим командам.

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gce_zone us-central1-a

(Необязательно) Использование gcloud для просмотра переменной

Чтобы увидеть значение переменной, запустите variables Airflow CLI с аргументом get или используйте пользовательский интерфейс Airflow .

Например:

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --get gcs_bucket

Вы можете сделать это с помощью любой из трех только что установленных переменных: gcp_project , gcs_bucket и gce_zone .

4. Пример рабочего процесса

Давайте взглянем на код группы обеспечения доступности баз данных, которую мы будем использовать на шаге 5. Пока не беспокойтесь о загрузке файлов, просто следуйте инструкциям здесь.

Здесь есть что раскрыть, поэтому давайте немного разберемся.

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

Начнем с импорта Airflow:

  • airflow.models — позволяет нам получать доступ и создавать данные в базе данных Airflow.
  • airflow.contrib.operators — Где живут операторы сообщества. В этом случае нам нужен dataproc_operator для доступа к API Cloud Dataproc.
  • airflow.utils.trigger_rule — для добавления правил триггера нашим операторам. Правила триггеров позволяют детально контролировать, должен ли оператор выполняться в зависимости от статуса его родительских элементов.
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep

Это указывает местоположение нашего выходного файла. Примечательной строкой здесь является models.Variable.get('gcs_bucket') которая извлекает значение переменной gcs_bucket из базы данных Airflow.

WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)

input_file = 'gs://pub/shakespeare/rose.txt'

wordcount_args = ['wordcount', input_file, output_file]
  • WORDCOUNT_JAR — расположение файла .jar, который мы в конечном итоге запустим в кластере Cloud Dataproc. Он уже размещен для вас на GCP.
  • input_file — расположение файла, содержащего данные, которые в конечном итоге будут вычисляться нашим заданием Hadoop. Мы вместе загрузим данные в это место на шаге 5.
  • wordcount_args — Аргументы, которые мы передадим в jar-файл.
yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

Это даст нам эквивалент объекта datetime, представляющий полночь предыдущего дня. Например, если это выполняется в 11:00 4 марта, объект datetime будет представлять 00:00 3 марта. Это связано с тем, как Airflow обрабатывает планирование. Более подробную информацию об этом можно найти здесь .

default_dag_args = {
    'start_date': yesterday,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

Переменная default_dag_args в форме словаря должна предоставляться при каждом создании новой группы обеспечения доступности баз данных:

  • 'email_on_failure' — указывает, следует ли отправлять оповещения по электронной почте в случае сбоя задачи.
  • 'email_on_retry' — указывает, следует ли отправлять оповещения по электронной почте при повторной попытке выполнения задачи.
  • 'retries' — обозначает, сколько повторных попыток Airflow должен выполнить в случае сбоя DAG.
  • 'retry_delay' — обозначает, как долго Airflow должен ждать перед попыткой повторной попытки.
  • 'project_id' — сообщает группе обеспечения доступности баз данных, с каким идентификатором проекта GCP его связать, который понадобится позже оператору Dataproc.
with models.DAG(
        'composer_hadoop_tutorial',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

Использование with models.DAG указывает сценарию включить все, что находится ниже него, в одну и ту же группу DAG. Мы также видим три переданных аргумента:

  • Первая строка — это имя, которое будет присвоено создаваемой нами группе обеспечения доступности баз данных. В данном случае мы используем composer_hadoop_tutorial .
  • schedule_interval — объект datetime.timedelta , для которого здесь мы установили один день. Это означает, что эта группа обеспечения доступности баз данных будет пытаться выполниться один раз в день после 'start_date' , установленного ранее в 'default_dag_args'
  • default_args — словарь, который мы создали ранее, содержащий аргументы по умолчанию для группы обеспечения доступности баз данных.

Создайте кластер Dataproc

Далее мы создадим dataproc_operator.DataprocClusterCreateOperator , который создаст кластер Cloud Dataproc.

    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

Внутри этого оператора мы видим несколько аргументов, все из которых, кроме первого, специфичны для этого оператора:

  • task_id — как и в BashOperator, это имя, которое мы присваиваем оператору, которое можно просмотреть в пользовательском интерфейсе Airflow.
  • cluster_name — имя, которое мы присваиваем кластеру Cloud Dataproc. Здесь мы назвали его composer-hadoop-tutorial-cluster- (дополнительную информацию см. в информационном окне).
  • num_workers — количество воркеров, которые мы выделяем в кластер Cloud Dataproc.
  • zone — географический регион, в котором мы хотим разместить кластер, сохраненный в базе данных Airflow. Это прочитает переменную 'gce_zone' которую мы установили на шаге 3.
  • master_machine_type — тип машины, которую мы хотим назначить мастеру Cloud Dataproc.
  • worker_machine_type — тип машины, которую мы хотим назначить рабочему процессу Cloud Dataproc.

Отправьте задание Apache Hadoop

dataproc_operator.DataProcHadoopOperator позволяет нам отправлять задание в кластер Cloud Dataproc.

    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

Мы предоставляем несколько параметров:

  • task_id — Имя, которое мы присваиваем этому фрагменту DAG.
  • main_jar — расположение файла .jar, который мы хотим запустить в кластере.
  • cluster_name — имя кластера, для которого будет выполняться задание, которое, как вы заметите, идентично тому, что мы находим в предыдущем операторе.
  • arguments — аргументы, которые передаются в файл jar, как если бы вы выполняли файл .jar из командной строки.

Удалить кластер

Последний оператор, который мы создадим, — это dataproc_operator.DataprocClusterDeleteOperator .

    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

Как следует из названия, этот оператор удалит данный кластер Cloud Dataproc. Здесь мы видим три аргумента:

  • task_id — как и в BashOperator, это имя, которое мы присваиваем оператору, которое можно просмотреть в пользовательском интерфейсе Airflow.
  • cluster_name — имя, которое мы присваиваем кластеру Cloud Dataproc. Здесь мы назвали его composer-hadoop-tutorial-cluster- (см. информационное окно после «Создание кластера Dataproc» для получения дополнительной информации).
  • trigger_rule — мы кратко упомянули правила триггеров во время импорта в начале этого шага, но здесь мы видим одно из них в действии. По умолчанию оператор Airflow не выполняется, если все его вышестоящие операторы не завершились успешно. Правило триггера ALL_DONE требует только того, чтобы все вышестоящие операторы завершили работу, независимо от того, были ли они успешными. В данном случае это означает, что даже если задание Hadoop завершилось неудачно, мы все равно хотим отключить кластер.
  create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Наконец, мы хотим, чтобы эти операторы выполнялись в определенном порядке, и мы можем обозначить это с помощью операторов битового сдвига Python. В этом случае сначала всегда будет выполняться create_dataproc_cluster , затем run_dataproc_hadoop и, наконец, delete_dataproc_cluster .

Собрав все это вместе, код выглядит следующим образом:

# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# [START composer_hadoop_tutorial]
"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
# Arguments to pass to Cloud Dataproc job.
input_file = 'gs://pub/shakespeare/rose.txt'

wordcount_args = ['wordcount', input_file, output_file]

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

# [START composer_hadoop_schedule]
with models.DAG(
        'composer_hadoop_tutorial',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:
    # [END composer_hadoop_schedule]

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/code.html#default-variables
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

    # [START composer_hadoop_steps]
    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
    # [END composer_hadoop_steps]

# [END composer_hadoop]

5. Загрузите файлы Airflow в облачное хранилище.

Скопируйте DAG в папку /dags.

  1. Сначала откройте Cloud Shell , в котором для вас удобно установлен Cloud SDK.
  2. Клонируйте репозиторий образцов Python и перейдите в каталог композитора/рабочих процессов.
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git && cd python-docs-samples/composer/workflows
  1. Выполните следующую команду, чтобы задать имя папки DAG для переменной среды.
DAGS_FOLDER=$(gcloud composer environments describe ${COMPOSER_INSTANCE} \
--location us-central1 --format="value(config.dagGcsPrefix)")
  1. Запустите следующую команду gsutil , чтобы скопировать код учебника в папку /dags.
gsutil cp hadoop_tutorial.py $DAGS_FOLDER

Ваш вывод будет выглядеть примерно так:

Copying file://hadoop_tutorial.py [Content-Type=text/x-python]...
/ [1 files][  4.1 KiB/  4.1 KiB]
Operation completed over 1 objects/4.1 KiB.

6. Использование пользовательского интерфейса Airflow

Чтобы получить доступ к веб-интерфейсу Airflow с помощью консоли GCP:

  1. Откройте страницу «Среды» .
  2. В столбце «Веб-сервер Airflow» для среды щелкните значок нового окна. Веб-интерфейс Airflow откроется в новом окне браузера.

Дополнительную информацию о пользовательском интерфейсе Airflow см. в разделе Доступ к веб-интерфейсу .

Просмотр переменных

Переменные, которые вы установили ранее, сохраняются в вашей среде. Вы можете просмотреть переменные, выбрав «Администратор» > «Переменные» в строке меню пользовательского интерфейса Airflow.

List tab is selected and shows a table with the following keys and values key: gcp_project, value: project-id key: gcs_bucket, value: gs://bucket-name key: gce_zone, value: zone

Изучение запусков DAG

Когда вы загружаете файл DAG в папку dags в Cloud Storage, Cloud Composer анализирует файл. Если ошибок не обнаружено, имя рабочего процесса появляется в списке DAG, и рабочий процесс ставится в очередь для немедленного запуска. Чтобы просмотреть свои группы обеспечения доступности баз данных, нажмите на группы обеспечения доступности баз данных в верхней части страницы.

84a29c71f20bff98.png

Нажмите composer_hadoop_tutorial , чтобы открыть страницу сведений о группе обеспечения доступности баз данных. Эта страница содержит графическое представление задач и зависимостей рабочего процесса.

f4f1663c7a37f47c.png

Теперь на панели инструментов нажмите «Просмотр графика» , а затем наведите указатель мыши на изображение каждой задачи, чтобы увидеть ее статус. Обратите внимание, что рамка вокруг каждой задачи также указывает на ее статус (зеленая рамка = выполняется; красная = сбой и т. д.).

4c5a0c6fa9f88513.png

Чтобы снова запустить рабочий процесс из представления графика :

  1. В представлении графика пользовательского интерфейса Airflow щелкните изображение create_dataproc_cluster .
  2. Нажмите «Очистить» , чтобы сбросить три задачи, а затем нажмите «ОК» для подтверждения.

fd1b23b462748f47.png

Вы также можете проверить состояние и результаты рабочего процесса composer-hadoop-tutorial перейдя на следующие страницы консоли GCP:

  • Cloud Dataproc Clusters для мониторинга создания и удаления кластеров. Обратите внимание, что кластер, созданный рабочим процессом, является эфемерным: он существует только на протяжении всего рабочего процесса и удаляется как часть последней задачи рабочего процесса.
  • Cloud Dataproc Jobs для просмотра или мониторинга задания подсчета слов Apache Hadoop. Щелкните идентификатор задания, чтобы просмотреть выходные данные журнала задания.
  • Браузер Cloud Storage , чтобы просмотреть результаты подсчета слов в папке wordcount в сегменте Cloud Storage, созданном для этой лаборатории кода.

7. Очистка

Чтобы избежать взимания платы с вашей учетной записи GCP за ресурсы, используемые в этой лаборатории кода:

  1. (Необязательно) Чтобы сохранить данные, загрузите их из сегмента Cloud Storage для среды Cloud Composer и сегмента хранилища, созданного вами для этой лаборатории кода.
  2. Удалите сегмент Cloud Storage , созданный для этой лаборатории кода.
  3. Удалите сегмент Cloud Storage для среды.
  4. Удалите среду Cloud Composer . Обратите внимание, что удаление вашей среды не приводит к удалению сегмента хранилища для этой среды.

Вы также можете при желании удалить проект:

  1. В консоли GCP перейдите на страницу «Проекты» .
  2. В списке проектов выберите проект, который хотите удалить, и нажмите «Удалить».
  3. В поле введите идентификатор проекта и нажмите «Завершить работу» , чтобы удалить проект.