Запустите конвейер обработки больших данных в Cloud Dataflow.

1. Обзор

Облако-Dataflow.png

Что такое поток данных?

Dataflow — это управляемая служба для выполнения самых разных шаблонов обработки данных. В документации на этом сайте показано, как развернуть конвейеры пакетной и потоковой обработки данных с помощью Dataflow, включая инструкции по использованию функций службы.

Apache Beam SDK — это модель программирования с открытым исходным кодом, которая позволяет разрабатывать как пакетные, так и потоковые конвейеры. Вы создаете свои конвейеры с помощью программы Apache Beam, а затем запускаете их в службе Dataflow. Документация Apache Beam содержит подробную концептуальную информацию и справочные материалы по модели программирования Apache Beam, пакетам SDK и другим средам выполнения.

Потоковая аналитика данных с высокой скоростью

Dataflow обеспечивает быструю и упрощенную разработку конвейера потоковых данных с меньшей задержкой данных.

Упрощение операций и управления

Позвольте командам сосредоточиться на программировании, а не на управлении кластерами серверов, поскольку бессерверный подход Dataflow устраняет операционные накладные расходы при рабочих нагрузках по обработке данных.

Снижение совокупной стоимости владения

Автоматическое масштабирование ресурсов в сочетании с возможностями пакетной обработки с оптимизацией затрат означает, что Dataflow предлагает практически безграничные возможности для управления сезонными и скачкообразными рабочими нагрузками без перерасхода средств.

Ключевые особенности

Автоматизированное управление ресурсами и динамическая ребалансировка работы

Dataflow автоматизирует предоставление ресурсов обработки и управление ими, чтобы минимизировать задержку и максимально повысить эффективность использования, поэтому вам не нужно разворачивать экземпляры или резервировать их вручную. Разделение работы также автоматизировано и оптимизировано для динамической балансировки отстающей работы. Нет необходимости искать «горячие клавиши» или предварительно обрабатывать входные данные.

Горизонтальное автомасштабирование

Горизонтальное автоматическое масштабирование рабочих ресурсов для оптимальной пропускной способности приводит к улучшению общего соотношения цены и производительности.

Гибкое планирование ресурсов для пакетной обработки

Для обработки с гибким временем планирования заданий, например заданий в ночное время, гибкое планирование ресурсов (FlexRS) предлагает более низкую цену за пакетную обработку. Эти гибкие задания помещаются в очередь с гарантией, что они будут получены для выполнения в течение шести часов.

Это руководство адаптировано с https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven.

Что вы узнаете

  • Как создать проект Maven с помощью Apache Beam, используя Java SDK
  • Запустите пример конвейера с помощью консоли Google Cloud Platform.
  • Как удалить связанную корзину Cloud Storage и ее содержимое

Что вам понадобится

Как вы будете использовать этот урок?

Прочтите только до конца Прочитайте его и выполните упражнения.

Как бы вы оценили свой опыт использования сервисов Google Cloud Platform?

Новичок Средний Опытный

2. Настройка и требования

Самостоятельная настройка среды

  1. Войдите в Cloud Console и создайте новый проект или повторно используйте существующий. (Если у вас еще нет учетной записи Gmail или G Suite, вам необходимо ее создать .)

dMbN6g9RawQj_VXCSYpdYncY-DbaRzr2GbnwoV7jFf1u3avxJtmGPmKpMYgiaMH-qu80a_NJ9p2IIXFppYk8x3wyymZXavjglNLJJhuXieCem56H30hwXtd8PvXGpXJO9gEUDu3cZw

ci9Oe6PgnbNuSYlMyvbXF1JdQyiHoEgnhl4PlV_MFagm2ppzhueRkqX4eLjJllZco_2zCp0V0bpTupUSKji9KkQyWqj11pqit1K1faS1V6aFxLGQdkuzGp4rsQTan7F01iePL5DtqQ

8-tA_Lheyo8SscAVKrGii2coplQp2_D1Iosb2ViABY0UUO1A8cimXUu6Wf1R9zJIRExL5OB2j946aIiFtyKTzxDcNnuznmR45vZ2HMoK3o67jxuoUJCAnqvEX6NgPGFjCVNgASc-lg

Запомните идентификатор проекта — уникальное имя для всех проектов Google Cloud (имя, указанное выше, уже занято и не подойдет вам, извините!). Позже в этой лаборатории он будет называться PROJECT_ID .

  1. Далее вам необходимо включить биллинг в Cloud Console, чтобы использовать ресурсы Google Cloud.

Прохождение этой лаборатории кода не должно стоить много, если вообще стоит. Обязательно следуйте всем инструкциям в разделе «Очистка», в которых рассказывается, как отключить ресурсы, чтобы вам не приходилось нести расходы, выходящие за рамки этого руководства. Новые пользователи Google Cloud имеют право на участие в программе бесплатной пробной версии стоимостью 300 долларов США .

Включите API

Нажмите на значок меню в левом верхнем углу экрана.

2bfc27ef9ba2ec7d.png

В раскрывающемся списке выберите API и службы > Панель мониторинга .

5b65523a6cc0afa6.png

Выберите + Включить API и службы.

81ed72192c0edd96.png

Найдите «Computer Engine» в поле поиска. Нажмите «Compute Engine API» в появившемся списке результатов.

3f201e991c7b4527.png

На странице Google Compute Engine нажмите «Включить» .

ac121653277fa7bb.png

После включения нажмите стрелку, чтобы вернуться назад.

Теперь найдите следующие API и включите их:

  • Облачный поток данных
  • Стекдрайвер
  • Облачное хранилище
  • Облачное хранилище JSON
  • Большой запрос
  • Облачный паб/саб
  • Облачное хранилище данных
  • API-интерфейсы диспетчера облачных ресурсов

3. Создайте новую корзину Cloud Storage.

В консоли Google Cloud Platform щелкните значок меню в левом верхнем углу экрана:

2bfc27ef9ba2ec7d.png

Прокрутите вниз и в подразделе «Хранилище» выберите «Облачное хранилище» > «Браузер» :

2b6c3a2a92b47015.png

Теперь вы должны увидеть браузер Cloud Storage, и, если вы используете проект, в котором в настоящее время нет сегментов Cloud Storage, вы увидите приглашение создать новый сегмент. Нажмите кнопку «Создать сегмент» , чтобы создать его:

a711016d5a99dc37.png

Введите имя для своего сегмента. Как отмечается в диалоговом окне, имена сегментов должны быть уникальными во всем облачном хранилище. Поэтому, если вы выберете очевидное имя, например «тест», вы, вероятно, обнаружите, что кто-то другой уже создал корзину с таким именем, и получите сообщение об ошибке.

Существуют также некоторые правила относительно того, какие символы разрешены в именах сегментов. Если вы начинаете и заканчиваете имя корзины буквой или цифрой и используете только тире посередине, то все будет в порядке. Если вы попытаетесь использовать специальные символы или попытаетесь начать или закончить имя корзины чем-то, кроме буквы или цифры, диалоговое окно напомнит вам о правилах.

3a5458648cfe3358.png

Введите уникальное имя для своего сегмента и нажмите «Создать» . Если вы выберете что-то, что уже используется, вы увидите сообщение об ошибке, показанное выше. Когда вы успешно создадите корзину, вы попадете в новую, пустую корзину в браузере:

3bda986ae88c4e71.png

Имя сегмента, которое вы видите, конечно, будет другим, поскольку оно должно быть уникальным для всех проектов.

4. Запустите Cloud Shell.

Активировать Cloud Shell

  1. В Cloud Console нажмите «Активировать Cloud Shell». H7JlbhKGHITmsxhQIcLwoe5HXZMhDlYue4K-SPszMxUxDjIeWfOHBfxDHYpmLQTzUmQ7Xx8o6OJUlannQF0iBuUyfp1RzVad_4nCa0Zz5LtwBlUZFXFCWFrmrWZLqg1MkZz2LdgUDQ .

zlNW0HehB_AFW1qZ4AyebSQUdWm95n7TbnOr7UVm3j9dFcg6oWapJRlC0jnU1Mvb-IQp-trP1Px8xKNwt6o3pP6fyih947sEhOFI4IRF0W7WZk6hFqZDUGXQQXrw21GuMm2ecH rbzQ

Если вы никогда раньше не запускали Cloud Shell, вам будет представлен промежуточный экран (ниже сгиба) с описанием того, что это такое. В этом случае нажмите «Продолжить» (и вы больше никогда его не увидите). Вот как выглядит этот одноразовый экран:

kEPbNAo_w5C_pi9QvhFwWwky1cX8hr_xEMGWySNIoMCdi-Djx9AQRqWn-__DmEpC7vKgUtl-feTcv-wBxJ8NwzzAp7mY65-fi2LJo4twUoewT1SUjd6Y3h81RG3rKIkqhoVlFR-G7w

Подготовка и подключение к Cloud Shell займет всего несколько минут.

pTv5mEKzWMWp5VBrg2eGcuRPv9dLInPToS-mohlrqDASyYGWnZ_SwE-MzOWHe76ZdCSmw0kgWogSJv27lrQE8pvA5OD6P1I47nz8vrAdK7yR1NseZKJvcxAZrPb8wRxoqyTpD-gbha

Эта виртуальная машина оснащена всеми необходимыми инструментами разработки. Он предлагает постоянный домашний каталог объемом 5 ГБ и работает в Google Cloud, что значительно повышает производительность сети и аутентификацию. Большую часть, если не всю, работу в этой лаборатории кода можно выполнить с помощью просто браузера или Chromebook.

После подключения к Cloud Shell вы увидите, что вы уже прошли аутентификацию и что для проекта уже установлен идентификатор вашего проекта.

  1. Выполните следующую команду в Cloud Shell, чтобы подтвердить, что вы прошли аутентификацию:
gcloud auth list

Вывод команды

 Credentialed Accounts
ACTIVE  ACCOUNT
*       <my_account>@<my_domain.com>

To set the active account, run:
    $ gcloud config set account `ACCOUNT`
gcloud config list project

Вывод команды

[core]
project = <PROJECT_ID>

Если это не так, вы можете установить его с помощью этой команды:

gcloud config set project <PROJECT_ID>

Вывод команды

Updated property [core/project].

5. Создайте проект Maven.

После запуска Cloud Shell давайте начнем с создания проекта Maven с использованием Java SDK для Apache Beam.

Apache Beam — это модель программирования с открытым исходным кодом для конвейеров данных. Вы определяете эти конвейеры с помощью программы Apache Beam и можете выбрать исполнитель, например Dataflow, для выполнения вашего конвейера.

Запустите mvn archetype:generate в своей оболочке следующим образом:

  mvn archetype:generate \
     -DarchetypeGroupId=org.apache.beam \
     -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
     -DarchetypeVersion=2.46.0 \
     -DgroupId=org.example \
     -DartifactId=first-dataflow \
     -Dversion="0.1" \
     -Dpackage=org.apache.beam.examples \
     -DinteractiveMode=false

После запуска команды вы должны увидеть новый каталог с именем first-dataflow в вашем текущем каталоге. first-dataflow содержит проект Maven, который включает SDK Cloud Dataflow для Java и примеры конвейеров.

6. Запустите конвейер обработки текста в Cloud Dataflow.

Начнем с сохранения идентификатора нашего проекта и имен сегментов Cloud Storage в качестве переменных среды. Вы можете сделать это в Cloud Shell. Обязательно замените <your_project_id> собственным идентификатором проекта.

 export PROJECT_ID=<your_project_id>

Теперь мы сделаем то же самое для сегмента Cloud Storage. Не забудьте заменить <your_bucket_name> уникальным именем, которое вы использовали для создания корзины на предыдущем этапе.

 export BUCKET_NAME=<your_bucket_name>

Перейдите в каталог first-dataflow/ .

 cd first-dataflow

Мы собираемся запустить конвейер под названием WordCount, который считывает текст, разбивает текстовые строки на отдельные слова и выполняет подсчет частоты каждого из этих слов. Сначала мы запустим конвейер, и пока он работает, посмотрим, что происходит на каждом этапе.

Запустите конвейер, выполнив команду mvn compile exec:java в оболочке или окне терминала. Для аргументов --project, --stagingLocation, и --output команда ниже ссылается на переменные среды, которые вы установили ранее на этом шаге.

 mvn compile exec:java \
      -Pdataflow-runner compile exec:java \
      -Dexec.mainClass=org.apache.beam.examples.WordCount \
      -Dexec.args="--project=${PROJECT_ID} \
      --stagingLocation=gs://${BUCKET_NAME}/staging/ \
      --output=gs://${BUCKET_NAME}/output \
      --runner=DataflowRunner \
      --region=us-central1 \
      --gcpTempLocation=gs://${BUCKET_NAME}/temp"

Пока задание выполняется, давайте найдем его в списке заданий.

Откройте веб-интерфейс Cloud Dataflow в консоли Google Cloud Platform . Вы должны увидеть свое задание по подсчету слов со статусом « Выполняется »:

3623be74922e3209.png

Теперь давайте посмотрим на параметры конвейера. Начните с нажатия на название вашей вакансии:

816d8f59c72797d7.png

При выборе задания вы можете просмотреть график выполнения . Граф выполнения конвейера представляет каждое преобразование в конвейере в виде поля, содержащего имя преобразования и некоторую информацию о состоянии. Вы можете нажать на карат в правом верхнем углу каждого шага, чтобы увидеть более подробную информацию:

80a972dd19a6f1eb.png

Давайте посмотрим, как конвейер преобразует данные на каждом этапе:

  • Чтение : на этом этапе конвейер считывает данные из источника входных данных. В данном случае это текстовый файл из Cloud Storage, содержащий весь текст пьесы Шекспира «Король Лир» . Наш конвейер считывает файл построчно и выводит каждый из них PCollection , где каждая строка нашего текстового файла является элементом коллекции.
  • CountWords : шаг CountWords состоит из двух частей. Во-первых, он использует параллельную функцию do (ParDo) с именем ExtractWords для разбивки каждой строки на отдельные слова. Результатом ExtractWords является новая коллекция PCollection, где каждый элемент представляет собой слово. Следующий шаг, Count , использует преобразование, предоставляемое Java SDK, которое возвращает пары ключ-значение, где ключ — это уникальное слово, а значение — это количество раз, которое оно встречается. Вот метод, реализующий CountWords , и вы можете просмотреть полный файл WordCount.java на GitHub :
 /**
   * A PTransform that converts a PCollection containing lines of text into a PCollection of
   * formatted word counts.
   *
   * <p>Concept #3: This is a custom composite transform that bundles two transforms (ParDo and
   * Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse,
   * modular testing, and an improved monitoring experience.
   */
  public static class CountWords
      extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
    @Override
    public PCollection<KV<String, Long>> expand(PCollection<String> lines) {

      // Convert lines of text into individual words.
      PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));

      // Count the number of times each word occurs.
      PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());

      return wordCounts;
    }
  }
  • MapElements : вызывает FormatAsTextFn , скопированный ниже, который форматирует каждую пару ключ-значение в печатную строку.
  /** A SimpleFunction that converts a Word and Count into a printable string. */
  public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
    @Override
    public String apply(KV<String, Long> input) {
      return input.getKey() + ": " + input.getValue();
    }
  }
  • WriteCounts : на этом этапе мы записываем строки, пригодные для печати, в несколько сегментированных текстовых файлов.

Через несколько минут мы посмотрим на результат работы конвейера.

Теперь взгляните на страницу информации о задании справа от графика, которая включает параметры конвейера, которые мы включили в команду mvn compile exec:java .

9723815a1f5bf08b.png

208a7f0d6973acf6.png

Вы также можете просмотреть пользовательские счетчики для конвейера, которые в данном случае показывают, сколько пустых строк было обнаружено во время выполнения. Вы можете добавить в свой конвейер новые счетчики, чтобы отслеживать метрики, специфичные для приложения.

a2e2800e2c6893f8.png

Вы можете щелкнуть значок «Журналы» в нижней части консоли, чтобы просмотреть конкретные сообщения об ошибках.

23c64138a1027f8.png

По умолчанию на панели отображаются сообщения журнала заданий, в которых сообщается о состоянии задания в целом. Вы можете использовать переключатель «Минимальная серьезность», чтобы фильтровать сообщения о ходе выполнения задания и статусе.

94ba42015fdafbe2.png

Выбор шага конвейера на графике изменяет представление журналов, созданных вашим кодом, и сгенерированного кода, выполняющегося на этапе конвейера.

Чтобы вернуться к журналам заданий, отмените выбор этого шага, щелкнув за пределами графика или нажав кнопку «Закрыть» на правой боковой панели.

Вы можете использовать кнопку «Журналы рабочих» на вкладке «Журналы», чтобы просмотреть журналы рабочих экземпляров Compute Engine, на которых работает ваш конвейер. Журналы рабочих состоят из строк журнала, созданных вашим кодом, и кода, созданного потоком данных, выполняющего его.

Если вы пытаетесь отладить сбой в конвейере, часто в рабочих журналах создается дополнительная запись, которая помогает решить проблему. Имейте в виду, что эти журналы агрегируются по всем работникам, и их можно фильтровать и искать.

5a53c244f28d5478.png

Интерфейс мониторинга потока данных показывает только самые последние сообщения журнала. Вы можете просмотреть все журналы, щелкнув ссылку Google Cloud Observability в правой части панели журналов.

2bc704a4d6529b31.png

Вот краткая информация о различных типах журналов, доступных для просмотра на странице «Мониторинг → Журналы»:

  • Журналы сообщений заданий содержат сообщения уровня заданий, которые генерируют различные компоненты Dataflow. Примеры включают конфигурацию автоматического масштабирования, когда рабочие запускаются или завершают работу, прогресс на этапе задания и ошибки задания. Ошибки рабочего уровня, возникающие из-за сбоя пользовательского кода и присутствующие в журналах рабочих процессов , также распространяются на журналы сообщений заданий .
  • рабочие журналы создаются работниками Dataflow. Воркеры выполняют большую часть конвейерной работы (например, применяют ваши ParDos к данным). Журналы рабочих содержат сообщения, зарегистрированные вашим кодом и потоком данных.
  • Журналы запуска рабочих присутствуют в большинстве заданий Dataflow и могут фиксировать сообщения, связанные с процессом запуска. Процесс запуска включает загрузку jar-файлов задания из Cloud Storage, а затем запуск рабочих процессов. Если возникла проблема с запуском рабочих процессов, можно поискать эти журналы.
  • Журналы shuffler содержат сообщения от рабочих процессов, которые объединяют результаты параллельных операций конвейера.
  • Журналы docker и kubelet содержат сообщения, относящиеся к этим общедоступным технологиям, которые используются в работниках Dataflow.

На следующем этапе мы проверим, что ваша работа выполнена успешно.

7. Убедитесь, что ваша работа выполнена успешно

Откройте веб-интерфейс Cloud Dataflow в консоли Google Cloud Platform .

Вы должны увидеть свое задание по подсчету слов сначала со статусом « Выполняется» , а затем «Успешно» :

4c408162416d03a2.png

Выполнение задания займет примерно 3-4 минуты.

Помните, когда вы запускали конвейер и указывали выходной сегмент? Давайте посмотрим на результат (разве вы не хотите узнать, сколько раз встречалось каждое слово в «Короле Лире» ?!). Вернитесь в браузер облачного хранилища в консоли Google Cloud Platform. В своем сегменте вы должны увидеть выходные файлы и промежуточные файлы, созданные вашим заданием:

25a5d3d4b5d0b567.png

8. Отключите свои ресурсы

Вы можете отключить свои ресурсы из консоли Google Cloud Platform .

Откройте браузер Cloud Storage в консоли Google Cloud Platform.

2b6c3a2a92b47015.png

Установите флажок рядом с созданным вами сегментом и нажмите «УДАЛИТЬ» , чтобы окончательно удалить сегмент и его содержимое.

2f7780bdf10b69ba.png

8051ef293a8e5cfe.png

9. Поздравляем!

Вы узнали, как создать проект Maven с помощью SDK Cloud Dataflow, запустить пример конвейера с помощью консоли Google Cloud Platform и удалить связанный сегмент Cloud Storage и его содержимое.

Узнать больше

Лицензия

Эта работа распространяется под лицензией Creative Commons Attribution 3.0 Generic License и лицензией Apache 2.0.