1. Обзор
Что такое поток данных?
Dataflow — это управляемая служба для выполнения самых разных шаблонов обработки данных. В документации на этом сайте показано, как развернуть конвейеры пакетной и потоковой обработки данных с помощью Dataflow, включая инструкции по использованию функций службы.
Apache Beam SDK — это модель программирования с открытым исходным кодом, которая позволяет разрабатывать как пакетные, так и потоковые конвейеры. Вы создаете свои конвейеры с помощью программы Apache Beam, а затем запускаете их в службе Dataflow. Документация Apache Beam содержит подробную концептуальную информацию и справочные материалы по модели программирования Apache Beam, пакетам SDK и другим средам выполнения.
Потоковая аналитика данных с высокой скоростью
Dataflow обеспечивает быструю и упрощенную разработку конвейера потоковых данных с меньшей задержкой данных.
Упрощение операций и управления
Позвольте командам сосредоточиться на программировании, а не на управлении кластерами серверов, поскольку бессерверный подход Dataflow устраняет операционные накладные расходы при рабочих нагрузках по обработке данных.
Снижение совокупной стоимости владения
Автоматическое масштабирование ресурсов в сочетании с возможностями пакетной обработки с оптимизацией затрат означает, что Dataflow предлагает практически безграничные возможности для управления сезонными и скачкообразными рабочими нагрузками без перерасхода средств.
Ключевые особенности
Автоматизированное управление ресурсами и динамическая ребалансировка работы
Dataflow автоматизирует предоставление ресурсов обработки и управление ими, чтобы минимизировать задержку и максимально повысить эффективность использования, поэтому вам не нужно разворачивать экземпляры или резервировать их вручную. Разделение работы также автоматизировано и оптимизировано для динамической балансировки отстающей работы. Нет необходимости искать «горячие клавиши» или предварительно обрабатывать входные данные.
Горизонтальное автомасштабирование
Горизонтальное автоматическое масштабирование рабочих ресурсов для оптимальной пропускной способности приводит к улучшению общего соотношения цены и производительности.
Гибкое планирование ресурсов для пакетной обработки
Для обработки с гибким временем планирования заданий, например заданий в ночное время, гибкое планирование ресурсов (FlexRS) предлагает более низкую цену за пакетную обработку. Эти гибкие задания помещаются в очередь с гарантией, что они будут получены для выполнения в течение шести часов.
Это руководство адаптировано с https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven.
Что вы узнаете
- Как создать проект Maven с помощью Apache Beam, используя Java SDK
- Запустите пример конвейера с помощью консоли Google Cloud Platform.
- Как удалить связанную корзину Cloud Storage и ее содержимое
Что вам понадобится
Как вы будете использовать этот урок?
Как бы вы оценили свой опыт использования сервисов Google Cloud Platform?
2. Настройка и требования
Самостоятельная настройка среды
- Войдите в Cloud Console и создайте новый проект или повторно используйте существующий. (Если у вас еще нет учетной записи Gmail или G Suite, вам необходимо ее создать .)
Запомните идентификатор проекта — уникальное имя для всех проектов Google Cloud (имя, указанное выше, уже занято и не подойдет вам, извините!). Позже в этой лаборатории он будет называться PROJECT_ID
.
- Далее вам необходимо включить биллинг в Cloud Console, чтобы использовать ресурсы Google Cloud.
Прохождение этой лаборатории кода не должно стоить много, если вообще стоит. Обязательно следуйте всем инструкциям в разделе «Очистка», в которых рассказывается, как отключить ресурсы, чтобы вам не приходилось нести расходы, выходящие за рамки этого руководства. Новые пользователи Google Cloud имеют право на участие в программе бесплатной пробной версии стоимостью 300 долларов США .
Включите API
Нажмите на значок меню в левом верхнем углу экрана.
В раскрывающемся списке выберите API и службы > Панель мониторинга .
Выберите + Включить API и службы.
Найдите «Computer Engine» в поле поиска. Нажмите «Compute Engine API» в появившемся списке результатов.
На странице Google Compute Engine нажмите «Включить» .
После включения нажмите стрелку, чтобы вернуться назад.
Теперь найдите следующие API и включите их:
- Облачный поток данных
- Стекдрайвер
- Облачное хранилище
- Облачное хранилище JSON
- Большой запрос
- Облачный паб/саб
- Облачное хранилище данных
- API-интерфейсы диспетчера облачных ресурсов
3. Создайте новую корзину Cloud Storage.
В консоли Google Cloud Platform щелкните значок меню в левом верхнем углу экрана:
Прокрутите вниз и в подразделе «Хранилище» выберите «Облачное хранилище» > «Браузер» :
Теперь вы должны увидеть браузер Cloud Storage, и, если вы используете проект, в котором в настоящее время нет сегментов Cloud Storage, вы увидите приглашение создать новый сегмент. Нажмите кнопку «Создать сегмент» , чтобы создать его:
Введите имя для своего сегмента. Как отмечается в диалоговом окне, имена сегментов должны быть уникальными во всем облачном хранилище. Поэтому, если вы выберете очевидное имя, например «тест», вы, вероятно, обнаружите, что кто-то другой уже создал корзину с таким именем, и получите сообщение об ошибке.
Существуют также некоторые правила относительно того, какие символы разрешены в именах сегментов. Если вы начинаете и заканчиваете имя корзины буквой или цифрой и используете только тире посередине, то все будет в порядке. Если вы попытаетесь использовать специальные символы или попытаетесь начать или закончить имя корзины чем-то, кроме буквы или цифры, диалоговое окно напомнит вам о правилах.
Введите уникальное имя для своего сегмента и нажмите «Создать» . Если вы выберете что-то, что уже используется, вы увидите сообщение об ошибке, показанное выше. Когда вы успешно создадите корзину, вы попадете в новую, пустую корзину в браузере:
Имя сегмента, которое вы видите, конечно, будет другим, поскольку оно должно быть уникальным для всех проектов.
4. Запустите Cloud Shell.
Активировать Cloud Shell
- В Cloud Console нажмите «Активировать Cloud Shell».
.
Если вы никогда раньше не запускали Cloud Shell, вам будет представлен промежуточный экран (ниже сгиба) с описанием того, что это такое. В этом случае нажмите «Продолжить» (и вы больше никогда его не увидите). Вот как выглядит этот одноразовый экран:
Подготовка и подключение к Cloud Shell займет всего несколько минут.
Эта виртуальная машина оснащена всеми необходимыми инструментами разработки. Он предлагает постоянный домашний каталог объемом 5 ГБ и работает в Google Cloud, что значительно повышает производительность сети и аутентификацию. Большую часть, если не всю, работу в этой лаборатории кода можно выполнить с помощью просто браузера или Chromebook.
После подключения к Cloud Shell вы увидите, что вы уже прошли аутентификацию и что для проекта уже установлен идентификатор вашего проекта.
- Выполните следующую команду в Cloud Shell, чтобы подтвердить, что вы прошли аутентификацию:
gcloud auth list
Вывод команды
Credentialed Accounts ACTIVE ACCOUNT * <my_account>@<my_domain.com> To set the active account, run: $ gcloud config set account `ACCOUNT`
gcloud config list project
Вывод команды
[core] project = <PROJECT_ID>
Если это не так, вы можете установить его с помощью этой команды:
gcloud config set project <PROJECT_ID>
Вывод команды
Updated property [core/project].
5. Создайте проект Maven.
После запуска Cloud Shell давайте начнем с создания проекта Maven с использованием Java SDK для Apache Beam.
Apache Beam — это модель программирования с открытым исходным кодом для конвейеров данных. Вы определяете эти конвейеры с помощью программы Apache Beam и можете выбрать исполнитель, например Dataflow, для выполнения вашего конвейера.
Запустите mvn archetype:generate
в своей оболочке следующим образом:
mvn archetype:generate \
-DarchetypeGroupId=org.apache.beam \
-DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
-DarchetypeVersion=2.46.0 \
-DgroupId=org.example \
-DartifactId=first-dataflow \
-Dversion="0.1" \
-Dpackage=org.apache.beam.examples \
-DinteractiveMode=false
После запуска команды вы должны увидеть новый каталог с именем first-dataflow
в вашем текущем каталоге. first-dataflow
содержит проект Maven, который включает SDK Cloud Dataflow для Java и примеры конвейеров.
6. Запустите конвейер обработки текста в Cloud Dataflow.
Начнем с сохранения идентификатора нашего проекта и имен сегментов Cloud Storage в качестве переменных среды. Вы можете сделать это в Cloud Shell. Обязательно замените <your_project_id>
собственным идентификатором проекта.
export PROJECT_ID=<your_project_id>
Теперь мы сделаем то же самое для сегмента Cloud Storage. Не забудьте заменить <your_bucket_name>
уникальным именем, которое вы использовали для создания корзины на предыдущем этапе.
export BUCKET_NAME=<your_bucket_name>
Перейдите в каталог first-dataflow/
.
cd first-dataflow
Мы собираемся запустить конвейер под названием WordCount, который считывает текст, разбивает текстовые строки на отдельные слова и выполняет подсчет частоты каждого из этих слов. Сначала мы запустим конвейер, и пока он работает, посмотрим, что происходит на каждом этапе.
Запустите конвейер, выполнив команду mvn compile exec:java
в оболочке или окне терминала. Для аргументов --project, --stagingLocation,
и --output
команда ниже ссылается на переменные среды, которые вы установили ранее на этом шаге.
mvn compile exec:java \
-Pdataflow-runner compile exec:java \
-Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--project=${PROJECT_ID} \
--stagingLocation=gs://${BUCKET_NAME}/staging/ \
--output=gs://${BUCKET_NAME}/output \
--runner=DataflowRunner \
--region=us-central1 \
--gcpTempLocation=gs://${BUCKET_NAME}/temp"
Пока задание выполняется, давайте найдем его в списке заданий.
Откройте веб-интерфейс Cloud Dataflow в консоли Google Cloud Platform . Вы должны увидеть свое задание по подсчету слов со статусом « Выполняется »:
Теперь давайте посмотрим на параметры конвейера. Начните с нажатия на название вашей вакансии:
При выборе задания вы можете просмотреть график выполнения . Граф выполнения конвейера представляет каждое преобразование в конвейере в виде поля, содержащего имя преобразования и некоторую информацию о состоянии. Вы можете нажать на карат в правом верхнем углу каждого шага, чтобы увидеть более подробную информацию:
Давайте посмотрим, как конвейер преобразует данные на каждом этапе:
- Чтение : на этом этапе конвейер считывает данные из источника входных данных. В данном случае это текстовый файл из Cloud Storage, содержащий весь текст пьесы Шекспира «Король Лир» . Наш конвейер считывает файл построчно и выводит каждый из них
PCollection
, где каждая строка нашего текстового файла является элементом коллекции. - CountWords : шаг
CountWords
состоит из двух частей. Во-первых, он использует параллельную функцию do (ParDo) с именемExtractWords
для разбивки каждой строки на отдельные слова. Результатом ExtractWords является новая коллекция PCollection, где каждый элемент представляет собой слово. Следующий шаг,Count
, использует преобразование, предоставляемое Java SDK, которое возвращает пары ключ-значение, где ключ — это уникальное слово, а значение — это количество раз, которое оно встречается. Вот метод, реализующийCountWords
, и вы можете просмотреть полный файл WordCount.java на GitHub :
/**
* A PTransform that converts a PCollection containing lines of text into a PCollection of
* formatted word counts.
*
* <p>Concept #3: This is a custom composite transform that bundles two transforms (ParDo and
* Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse,
* modular testing, and an improved monitoring experience.
*/
public static class CountWords
extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
// Convert lines of text into individual words.
PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));
// Count the number of times each word occurs.
PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());
return wordCounts;
}
}
- MapElements : вызывает
FormatAsTextFn
, скопированный ниже, который форматирует каждую пару ключ-значение в печатную строку.
/** A SimpleFunction that converts a Word and Count into a printable string. */
public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
@Override
public String apply(KV<String, Long> input) {
return input.getKey() + ": " + input.getValue();
}
}
- WriteCounts : на этом этапе мы записываем строки, пригодные для печати, в несколько сегментированных текстовых файлов.
Через несколько минут мы посмотрим на результат работы конвейера.
Теперь взгляните на страницу информации о задании справа от графика, которая включает параметры конвейера, которые мы включили в команду mvn compile exec:java
.
Вы также можете просмотреть пользовательские счетчики для конвейера, которые в данном случае показывают, сколько пустых строк было обнаружено во время выполнения. Вы можете добавить в свой конвейер новые счетчики, чтобы отслеживать метрики, специфичные для приложения.
Вы можете щелкнуть значок «Журналы» в нижней части консоли, чтобы просмотреть конкретные сообщения об ошибках.
По умолчанию на панели отображаются сообщения журнала заданий, в которых сообщается о состоянии задания в целом. Вы можете использовать переключатель «Минимальная серьезность», чтобы фильтровать сообщения о ходе выполнения задания и статусе.
Выбор шага конвейера на графике изменяет представление журналов, созданных вашим кодом, и сгенерированного кода, выполняющегося на этапе конвейера.
Чтобы вернуться к журналам заданий, отмените выбор этого шага, щелкнув за пределами графика или нажав кнопку «Закрыть» на правой боковой панели.
Вы можете использовать кнопку «Журналы рабочих» на вкладке «Журналы», чтобы просмотреть журналы рабочих экземпляров Compute Engine, на которых работает ваш конвейер. Журналы рабочих состоят из строк журнала, созданных вашим кодом, и кода, созданного потоком данных, выполняющего его.
Если вы пытаетесь отладить сбой в конвейере, часто в рабочих журналах создается дополнительная запись, которая помогает решить проблему. Имейте в виду, что эти журналы агрегируются по всем работникам, и их можно фильтровать и искать.
Интерфейс мониторинга потока данных показывает только самые последние сообщения журнала. Вы можете просмотреть все журналы, щелкнув ссылку Google Cloud Observability в правой части панели журналов.
Вот краткая информация о различных типах журналов, доступных для просмотра на странице «Мониторинг → Журналы»:
- Журналы сообщений заданий содержат сообщения уровня заданий, которые генерируют различные компоненты Dataflow. Примеры включают конфигурацию автоматического масштабирования, когда рабочие запускаются или завершают работу, прогресс на этапе задания и ошибки задания. Ошибки рабочего уровня, возникающие из-за сбоя пользовательского кода и присутствующие в журналах рабочих процессов , также распространяются на журналы сообщений заданий .
- рабочие журналы создаются работниками Dataflow. Воркеры выполняют большую часть конвейерной работы (например, применяют ваши ParDos к данным). Журналы рабочих содержат сообщения, зарегистрированные вашим кодом и потоком данных.
- Журналы запуска рабочих присутствуют в большинстве заданий Dataflow и могут фиксировать сообщения, связанные с процессом запуска. Процесс запуска включает загрузку jar-файлов задания из Cloud Storage, а затем запуск рабочих процессов. Если возникла проблема с запуском рабочих процессов, можно поискать эти журналы.
- Журналы shuffler содержат сообщения от рабочих процессов, которые объединяют результаты параллельных операций конвейера.
- Журналы docker и kubelet содержат сообщения, относящиеся к этим общедоступным технологиям, которые используются в работниках Dataflow.
На следующем этапе мы проверим, что ваша работа выполнена успешно.
7. Убедитесь, что ваша работа выполнена успешно
Откройте веб-интерфейс Cloud Dataflow в консоли Google Cloud Platform .
Вы должны увидеть свое задание по подсчету слов сначала со статусом « Выполняется» , а затем «Успешно» :
Выполнение задания займет примерно 3-4 минуты.
Помните, когда вы запускали конвейер и указывали выходной сегмент? Давайте посмотрим на результат (разве вы не хотите узнать, сколько раз встречалось каждое слово в «Короле Лире» ?!). Вернитесь в браузер облачного хранилища в консоли Google Cloud Platform. В своем сегменте вы должны увидеть выходные файлы и промежуточные файлы, созданные вашим заданием:
8. Отключите свои ресурсы
Вы можете отключить свои ресурсы из консоли Google Cloud Platform .
Откройте браузер Cloud Storage в консоли Google Cloud Platform.
Установите флажок рядом с созданным вами сегментом и нажмите «УДАЛИТЬ» , чтобы окончательно удалить сегмент и его содержимое.
9. Поздравляем!
Вы узнали, как создать проект Maven с помощью SDK Cloud Dataflow, запустить пример конвейера с помощью консоли Google Cloud Platform и удалить связанный сегмент Cloud Storage и его содержимое.
Узнать больше
- Документация по потоку данных: https://cloud.google.com/dataflow/docs/.
Лицензия
Эта работа распространяется под лицензией Creative Commons Attribution 3.0 Generic License и лицензией Apache 2.0.