1. Введение - Google Dataproc
Dataproc — это полностью управляемый и масштабируемый сервис для запуска Apache Spark, Apache Flink, Presto и многих других инструментов и фреймворков с открытым исходным кодом. Используйте Dataproc для модернизации озер данных, ETL/ELT и безопасной обработки данных в глобальном масштабе. Dataproc также полностью интегрирован с несколькими сервисами Google Cloud, включая BigQuery , Cloud Storage , Vertex AI и Dataplex .
Dataproc доступен в трех вариантах:
- Dataproc Serverless позволяет запускать задания PySpark без необходимости настройки инфраструктуры и автомасштабирования. Dataproc Serverless поддерживает пакетные рабочие нагрузки PySpark, а также сессии/блокноты.
- Dataproc на Google Compute Engine позволяет управлять кластером Hadoop YARN для рабочих нагрузок Spark на основе YARN, а также использовать инструменты с открытым исходным кодом, такие как Flink и Presto. Вы можете настраивать свои облачные кластеры с любым желаемым вертикальным или горизонтальным масштабированием, включая автомасштабирование.
- Dataproc в Google Kubernetes Engine позволяет настраивать виртуальные кластеры Dataproc в вашей инфраструктуре GKE для отправки заданий Spark, PySpark, SparkR или Spark SQL.
2. Создайте кластер Dataproc в виртуальной частной сети Google Cloud.
На этом шаге вы создадите кластер Dataproc в Google Cloud, используя консоль Google Cloud.
В качестве первого шага включите API службы Dataproc в консоли. После включения найдите «Dataproc» в строке поиска и нажмите «Создать кластер» .
Выберите «Кластер» в Compute Engine , чтобы использовать виртуальные машины Google Compute Engine (GCE) в качестве базовой инфраструктуры для запуска кластеров Dataproc.

Вы находитесь на странице создания кластера.

На этой странице:
- Укажите уникальное имя для кластера.
- Выберите конкретный регион . Вы также можете выбрать зону, однако Dataproc позволяет автоматически выбрать её для вас. Для этого практического занятия выберите «us-central1» и «us-central1-c».
- Выберите тип кластера «Стандартный». Это гарантирует наличие одного главного узла.
- На вкладке «Настройка узлов» подтвердите, что количество создаваемых рабочих узлов будет равно двум.
- В разделе «Настройка кластера» установите флажок рядом с пунктом «Включить шлюз компонентов». Это позволит получить доступ к веб-интерфейсам кластера, включая пользовательский интерфейс Spark, менеджер узлов Yarn и блокноты Jupyter.
- В разделе «Дополнительные компоненты» выберите Jupyter Notebook. Это настроит кластер с сервером Jupyter Notebook.
- Оставьте все остальные параметры без изменений и нажмите «Создать кластер».
Это позволит развернуть кластер Dataproc.
3. Запустите кластер и подключитесь к нему по SSH.
После того как статус кластера изменится на «Работает» , щелкните по имени кластера в консоли Dataproc.

Перейдите на вкладку «Экземпляр виртуальной машины» , чтобы просмотреть главный узел и два рабочих узла кластера.

Чтобы войти в главный узел, нажмите на кнопку SSH рядом с ним.

Выполните команды hdfs, чтобы увидеть структуру каталогов.
hadoop_commands_example
sudo hadoop fs -ls /
sudo hadoop version
sudo hadoop fs -mkdir /test51
sudo hadoop fs -ls /
4. Веб-интерфейсы и шлюзы компонентов
В консоли кластера Dataproc щелкните имя вашего кластера, затем перейдите на вкладку «ВЕБ-ИНТЕРФЕЙСЫ» .

Здесь показаны доступные веб-интерфейсы, включая Jupyter . Щелкните Jupyter , чтобы открыть блокнот Jupyter. Вы можете использовать это для создания блокнотов в PySpark, хранящихся в GCS. Чтобы сохранить свой блокнот в Google Cloud Storage, откройте блокнот PySpark для использования в этом практическом задании.
5. Мониторинг и наблюдение за заданиями Spark.
После запуска кластера Dataproc создайте пакетное задание PySpark и отправьте его в кластер Dataproc.
Создайте сегмент Google Cloud Storage (GCS) для хранения скрипта PySpark. Убедитесь, что сегмент создан в том же регионе, что и кластер Dataproc.

После создания хранилища GCS скопируйте в него следующий файл.
https://raw.githubusercontent.com/diptimanr/spark-on-gce/main/test-spark-1.py
Этот скрипт создает пример Spark DataFrame и записывает его в виде таблицы Hive.
hive_job.py
from pyspark.sql import SparkSession
from datetime import datetime, date
from pyspark.sql import Row
spark = SparkSession.builder.master("local").enableHiveSupport().getOrCreate()
df = spark.createDataFrame([ (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
(2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)), (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
print("..... Writing data .....")
df.write.mode("overwrite").saveAsTable("test_table_1")
print("..... Complete .....")
Отправьте этот скрипт в качестве пакетного задания Spark в Dataproc. Щелкните «Задания» в левом навигационном меню, а затем нажмите «Отправить задание».

Укажите идентификатор задания и регион . Выберите свой кластер и укажите местоположение GCS скопированного вами скрипта Spark. Это задание будет выполняться как пакетное задание Spark в Dataproc.
В разделе «Свойства» добавьте ключ spark.submit.deployMode и значение client , чтобы драйвер запускался на главном узле Dataproc, а не на рабочих узлах. Нажмите «Отправить» , чтобы отправить пакетное задание в Dataproc.

Скрипт Spark создаст DataFrame и запишет данные в таблицу Hive test_table_1 .
После успешного выполнения задания вы сможете увидеть сообщения, выведенные в консоль, на вкладке «Мониторинг» .

Теперь, когда таблица Hive создана, отправьте еще одно задание запроса Hive, чтобы выбрать содержимое таблицы и отобразить его в консоли.
Создайте еще одно задание со следующими свойствами:

Обратите внимание, что тип задания установлен на Hive , а тип источника запроса — Query Text , что означает, что мы запишем весь оператор HiveQL в текстовое поле Query Text .
Отправьте задание, оставив остальные параметры по умолчанию.

Обратите внимание, как HiveQL выбирает все записи и отображает их в консоли.
6. Автомасштабирование
Автомасштабирование — это задача оценки «правильного» количества рабочих узлов кластера для заданной рабочей нагрузки.
API Dataproc AutoscalingPolicies предоставляет механизм для автоматизации управления ресурсами кластера и позволяет осуществлять автоматическое масштабирование виртуальных машин рабочих узлов кластера. Политика автоматического масштабирования — это многократно используемая конфигурация, описывающая, как должны масштабироваться рабочие узлы кластера, использующие эту политику. Она определяет границы масштабирования, частоту и агрессивность, обеспечивая точный контроль над ресурсами кластера на протяжении всего его жизненного цикла.
Политики автомасштабирования Dataproc записываются с использованием YAML-файлов, которые либо передаются в команде CLI для создания кластера, либо выбираются из хранилища GCS при создании кластера через Cloud Console.
Вот пример политики автомасштабирования Dataproc:
policy.yaml
workerConfig:
minInstances: 10
maxInstances: 10
secondaryWorkerConfig:
maxInstances: 50
basicAlgorithm:
cooldownPeriod: 4m
yarnConfig:
scaleUpFactor: 0.05
scaleDownFactor: 1.0
gracefulDecommissionTimeout: 1h
7. Настройка дополнительных компонентов Dataproc
Это позволит развернуть кластер Dataproc.
При создании кластера Dataproc стандартные компоненты экосистемы Apache Hadoop автоматически устанавливаются в кластер (см. Список версий Dataproc ). При создании кластера можно установить дополнительные компоненты, называемые необязательными компонентами .

При создании кластера Dataproc из консоли мы включили необязательные компоненты и выбрали Jupyter Notebook в качестве необязательного компонента.
8. Очистка ресурсов
Чтобы очистить кластер, после выбора кластера в консоли Dataproc нажмите кнопку «Остановить» . После остановки кластера нажмите кнопку «Удалить» , чтобы удалить кластер.
После удаления кластера Dataproc удалите сегменты GCS, куда был скопирован код.
Для освобождения ресурсов и прекращения нежелательного выставления счетов необходимо сначала остановить, а затем удалить кластер Dataproc.
Перед остановкой и удалением кластера убедитесь, что все данные, записанные в хранилище HDFS, скопированы в GCS для обеспечения надежного хранения.
Чтобы остановить кластер, нажмите кнопку «Остановить» .

После остановки кластера нажмите кнопку «Удалить» , чтобы удалить кластер.
В диалоговом окне подтверждения нажмите «Удалить» , чтобы удалить кластер.
