1. Введение
В этом практическом занятии вы изучите преимущества повышения производительности собственного механизма выполнения Apache Spark — Lightning Engine — в Google Cloud Serverless for Apache Spark , а также рассмотрите, как он оптимизирует ваши рабочие нагрузки Spark на Serverless for Apache Spark .
Lightning Engine использует Velox и Apache Gluten . Velox — это высокопроизводительный движок обработки данных на C++. Apache Gluten — это промежуточный слой, отвечающий за преобразование заданий Spark на основе JVM в код C++, который может быть выполнен Velox.
В этой демонстрации используется TPC-DS , отраслевой стандартный бенчмарк, предназначенный для оценки производительности систем поддержки принятия решений. Вы запустите базовое задание PySpark для запроса к образцу набора данных TPC-DS, используя стандартный бессерверный уровень. Затем вы запустите точно такое же задание, используя уровень Premium с включенным Lightning Engine. Наконец, вы сравните время выполнения и перейдете к пользовательскому интерфейсу Spark, чтобы визуализировать разницу в графиках выполнения Spark с аппаратным ускорением.
Ориентировочная стоимость проведения этого практического занятия составляет менее 1 доллара США , при условии оперативного освобождения ресурсов, как описано в разделе «Очистка ресурсов».
Что вы будете делать
- Создайте хранилище Cloud Storage для хранения ваших скриптов и результатов бенчмаркинга.
- Выполните базовую задачу обработки данных PySpark, используя уровень Serverless for Apache Spark Standard.
- Выполните ту же задачу, используя уровень Serverless for Apache Spark Premium с Lightning Engine.
- Сравните показатели времени выполнения.
- Запустите пользовательский интерфейс Spark History Server, чтобы сравнить графики физического выполнения, полученные непосредственно в Spark.
Что вам понадобится
- Веб-браузер, например Chrome.
- Проект Google Cloud с включенной функцией выставления счетов.
- Базовые знания Apache Spark и командной строки Linux.
2. Прежде чем начать
Создайте проект в Google Cloud.
- В консоли Google Cloud на странице выбора проекта выберите или создайте проект Google Cloud .
- Убедитесь, что для вашего облачного проекта включена функция выставления счетов. Узнайте, как проверить, включена ли функция выставления счетов для проекта .
Запустить Cloud Shell
Cloud Shell — это среда командной строки, работающая в Google Cloud и поставляемая с предустановленными необходимыми инструментами.
- В верхней части консоли Google Cloud нажмите кнопку «Активировать Cloud Shell» .
- После подключения к Cloud Shell подтвердите свою аутентификацию:
gcloud auth list - Убедитесь, что ваш проект настроен:
gcloud config get project - Если параметры вашего проекта заданы не так, как ожидалось, настройте их следующим образом:
export PROJECT_ID=<YOUR_PROJECT_ID> gcloud config set project $PROJECT_ID
Включить API
Выполните эту команду, чтобы включить все необходимые API для данного практического занятия:
gcloud services enable \
dataproc.googleapis.com \
storage.googleapis.com \
compute.googleapis.com
3. Подготовьте окружающую среду
На этом шаге вы инициализируете переменные среды и создадите хранилище Cloud Storage. В этом хранилище будет храниться скрипт PySpark, который вы отправляете на оба уровня Serverless for Apache Spark.
Установка переменных среды
Выполните следующие команды в Cloud Shell, чтобы установить переменные среды по умолчанию. Мы будем использовать регион us-central1 , но вы можете изменить его по своему усмотрению.
export PROJECT_ID=$(gcloud config get-value project)
export REGION="us-central1"
export BUCKET_NAME="spark-benchmark-${PROJECT_ID}-${REGION}"
gcloud config set dataproc/region ${REGION}
Создайте корзину облачного хранилища.
Создайте хранилище для ваших скриптов и логов:
gcloud storage buckets create gs://${BUCKET_NAME} \
--uniform-bucket-level-access \
--location=${REGION}
Скопируйте набор данных TPC-DS в свой собственный сегмент.
На этом шаге вы скопируете набор данных TPC-DS из общедоступного хранилища в собственное хранилище Cloud Storage. Это гарантирует, что ваши задания PySpark смогут считывать данные локально из вашего проекта.
Задайте переменные среды, чтобы выбрать размер и тип набора данных:
export DATASET_TYPE="partitioned" # Options: partitioned, nonpartitioned
export DATASET_SIZE="1GB" # Options: 1GB, 10GB, 100GB, 1000GB (1000GB not available for partitioned)
export SRC_PATH="gs://beam-tpcds/datasets/parquet/${DATASET_TYPE}/${DATASET_SIZE}"
export DATASET_PATH="gs://${BUCKET_NAME}/tpc-ds-dataset/${DATASET_TYPE}/${DATASET_SIZE}"
Скопируйте данные TPC-DS в свой собственный контейнер:
gcloud storage cp -r ${SRC_PATH}/* ${DATASET_PATH}/
Создайте скрипт для тестирования производительности PySpark.
Мы будем использовать скрипт PySpark, который регистрирует стандартные таблицы TPC-DS из вашего хранилища Cloud Storage и выполняет 5 стандартных запросов, полученных из общедоступного репозитория Apache Spark. Скрипт принимает путь к вашему набору данных в качестве аргумента.
Создайте в Cloud Shell файл с именем benchmark.py . Для создания файла можно скопировать и вставить следующую команду:
cat << 'EOF' > benchmark.py
import argparse
import sys
from pyspark.sql import SparkSession
import time
def main():
parser = argparse.ArgumentParser(description='TPC-DS Benchmark')
parser.add_argument('data_path', help='GCS base path for TPC-DS tables')
args = parser.parse_args()
base_path = args.data_path
# Initialize Spark Session
spark = SparkSession.builder \
.appName("TPC-DS Benchmark") \
.getOrCreate()
print(f"Spark Session created. Registering TPC-DS tables from {base_path}...")
# List of all 24 TPC-DS tables
tables = [
"call_center", "catalog_page", "catalog_returns", "catalog_sales",
"customer", "customer_address", "customer_demographics", "date_dim",
"household_demographics", "income_band", "inventory", "item",
"promotion", "reason", "ship_mode", "store", "store_returns",
"store_sales", "time_dim", "warehouse", "web_page", "web_returns",
"web_sales", "web_site"
]
# Register each table as a temporary view
# For this subset of queries, not every table is used
for table in tables:
path = f"{base_path}/{table}"
try:
df = spark.read.parquet(path)
df.createOrReplaceTempView(table)
except Exception as e:
print(f"Warning: Could not load table {table} from {path}. Error: {e}")
print("Tables registered successfully. Starting benchmark queries from Apache Spark test suite...")
# Standard TPC-DS Queries sourced from Apache Spark public repository:
# https://github.com/apache/spark/tree/master/sql/core/src/test/resources/tpcds
queries = {
"Q1": """
WITH customer_total_return AS (
SELECT sr_customer_sk AS ctr_customer_sk,
sr_store_sk AS ctr_store_sk,
sum(sr_return_amt) AS ctr_total_return
FROM store_returns, date_dim
WHERE sr_returned_date_sk = d_date_sk
AND d_year = 2000
GROUP BY sr_customer_sk, sr_store_sk
)
SELECT c_customer_id
FROM customer_total_return ctr1, store, customer
WHERE ctr1.ctr_total_return > (
SELECT avg(ctr_total_return) * 1.2
FROM customer_total_return ctr2
WHERE ctr1.ctr_store_sk = ctr2.ctr_store_sk
)
AND s_store_sk = ctr1.ctr_store_sk
AND s_state = 'TN'
AND ctr1.ctr_customer_sk = c_customer_sk
ORDER BY c_customer_id
LIMIT 100
""",
"Q2": """
WITH wscs AS (
SELECT sold_date_sk, sales_price
FROM (
SELECT ws_sold_date_sk AS sold_date_sk, ws_ext_sales_price AS sales_price
FROM web_sales
UNION ALL
SELECT cs_sold_date_sk AS sold_date_sk, cs_ext_sales_price AS sales_price
FROM catalog_sales
)
),
wswscs AS (
SELECT d_week_seq,
sum(CASE WHEN (d_day_name='Sunday') THEN sales_price ELSE null END) AS sun_sales,
sum(CASE WHEN (d_day_name='Monday') THEN sales_price ELSE null END) AS mon_sales,
sum(CASE WHEN (d_day_name='Tuesday') THEN sales_price ELSE null END) AS tue_sales,
sum(CASE WHEN (d_day_name='Wednesday') THEN sales_price ELSE null END) AS wed_sales,
sum(CASE WHEN (d_day_name='Thursday') THEN sales_price ELSE null END) AS thu_sales,
sum(CASE WHEN (d_day_name='Friday') THEN sales_price ELSE null END) AS fri_sales,
sum(CASE WHEN (d_day_name='Saturday') THEN sales_price ELSE null END) AS sat_sales
FROM wscs, date_dim
WHERE d_date_sk = sold_date_sk
GROUP BY d_week_seq
)
SELECT d_week_seq1,
round(sun_sales1/sun_sales2, 2),
round(mon_sales1/mon_sales2, 2),
round(tue_sales1/tue_sales2, 2),
round(wed_sales1/wed_sales2, 2),
round(thu_sales1/thu_sales2, 2),
round(fri_sales1/fri_sales2, 2),
round(sat_sales1/sat_sales2, 2)
FROM (
SELECT wswscs.d_week_seq AS d_week_seq1,
sun_sales AS sun_sales1, mon_sales AS mon_sales1,
tue_sales AS tue_sales1, wed_sales AS wed_sales1,
thu_sales AS thu_sales1, fri_sales AS fri_sales1,
sat_sales AS sat_sales1
FROM wswscs, date_dim
WHERE date_dim.d_week_seq = wswscs.d_week_seq
AND d_year = 2001
) y,
(
SELECT wswscs.d_week_seq AS d_week_seq2,
sun_sales AS sun_sales2, mon_sales AS mon_sales2,
tue_sales AS tue_sales2, wed_sales AS wed_sales2,
thu_sales AS thu_sales2, fri_sales AS fri_sales2,
sat_sales AS sat_sales2
FROM wswscs, date_dim
WHERE date_dim.d_week_seq = wswscs.d_week_seq
AND d_year = 2001 + 1
) z
WHERE d_week_seq1 = d_week_seq2 - 53
ORDER BY d_week_seq1
""",
"Q3": """
SELECT dt.d_year,
item.i_brand_id AS brand_id,
item.i_brand AS brand,
sum(ss_ext_sales_price) AS sum_agg
FROM date_dim dt,
store_sales,
item
WHERE dt.d_date_sk = store_sales.ss_sold_date_sk
AND store_sales.ss_item_sk = item.i_item_sk
AND item.i_manufact_id = 436
AND dt.d_moy = 12
GROUP BY dt.d_year,
item.i_brand,
item.i_brand_id
ORDER BY dt.d_year,
sum_agg DESC,
brand_id
LIMIT 100
""",
"Q7": """
SELECT i_item_id,
avg(ss_quantity) AS agg1,
avg(ss_list_price) AS agg2,
avg(ss_coupon_amt) AS agg3,
avg(ss_sales_price) AS agg4
FROM store_sales,
customer_demographics,
date_dim,
item,
promotion
WHERE ss_sold_date_sk = d_date_sk
AND ss_item_sk = i_item_sk
AND ss_cdemo_sk = cd_demo_sk
AND ss_promo_sk = p_promo_sk
AND cd_gender = 'M'
AND cd_marital_status = 'S'
AND cd_education_status = 'College'
AND (p_channel_email = 'N' OR p_channel_event = 'N')
AND d_year = 2000
GROUP BY i_item_id
ORDER BY i_item_id
LIMIT 100
""",
"Q19": """
SELECT i_item_id,
i_brand,
i_category,
i_class,
i_manufact,
sum(ss_ext_sales_price) AS sales,
sum(ss_net_profit) AS profit
FROM date_dim,
store_sales,
item,
customer,
store
WHERE d_date_sk = ss_sold_date_sk
AND i_item_sk = ss_item_sk
AND d_year = 2000
AND d_moy = 12
AND c_customer_sk = ss_customer_sk
AND s_store_sk = ss_store_sk
AND i_manager_id = 9
GROUP BY i_item_id,
i_brand,
i_category,
i_class,
i_manufact
ORDER BY i_item_id,
i_brand,
i_category,
i_class,
i_manufact
LIMIT 100
"""
}
total_start_time = time.time()
for query_name, query_sql in queries.items():
print(f"\nExecuting {query_name}...")
query_start = time.time()
# Execute query and force action using show()
result_df = spark.sql(query_sql)
result_df.show(5) # Show top 5 rows
query_end = time.time()
print(f"{query_name} completed in {query_end - query_start:.2f} seconds.")
total_end_time = time.time()
print(f"\nAll benchmark queries completed in {total_end_time - total_start_time:.2f} seconds.")
spark.stop()
if __name__ == "__main__":
main()
EOF
Скопируйте скрипт в свой сегмент Cloud Storage, чтобы Serverless for Apache Spark мог получить к нему доступ:
gcloud storage cp benchmark.py gs://${BUCKET_NAME}/scripts/benchmark.py
4. Запустите базовое задание бессерверной архитектуры.
Для проведения базового сравнения без использования Lightning Engine, отправьте загруженное ранее задание PySpark для бенчмаркинга на уровень Serverless for Apache Spark Standard. В качестве аргумента мы передадим путь к скопированному вами набору данных.
Для выполнения пакетного задания выполните следующую команду:
gcloud dataproc batches submit pyspark \
gs://${BUCKET_NAME}/scripts/benchmark.py \
--region=${REGION} \
--version=2.3 \
--deps-bucket=gs://${BUCKET_NAME} \
-- ${DATASET_PATH}
Контроль за выполнением задания
Во время выполнения задачи вы будете видеть поток логов PySpark в терминале Cloud Shell. Serverless for Apache Spark выделяет контейнеры, считывает набор данных TPC-DS Parquet из Cloud Storage и выполняет сложные SQL-запросы.
После завершения выполнения скрипта просмотрите вывод в консоли. Вы должны увидеть результаты и время выполнения каждого стандартного запроса, примерно следующее:
... Executing Q1... +-------------+ |c_customer_id| +-------------+ ... Q1 completed in 18.52 seconds. ... All benchmark queries completed in 110.94 seconds.
Обратите внимание на общее время выполнения в секундах. Это ваше базовое время выполнения .
5. Запуск с использованием Serverless Premium и Lightning Engine.
Далее вы запустите точно такое же задание Spark на Serverless для Apache Spark , но используя уровень Premium и включив собственный векторизованный механизм запросов Google: Lightning Engine .
Отправьте задание на тестирование производительности в Serverless, явно включив Lightning Engine:
gcloud dataproc batches submit pyspark \
gs://${BUCKET_NAME}/scripts/benchmark.py \
--region=${REGION} \
--version=2.3 \
--deps-bucket=gs://${BUCKET_NAME} \
--properties="dataproc.tier=premium,spark.dataproc.lightningEngine.runtime=native" \
-- ${DATASET_PATH}
Сравните результаты
Дождитесь завершения задания и изучите результат. Вы должны увидеть те же результаты запроса. Внимательно посмотрите на время выполнения:
... All benchmark queries completed in 64.24 seconds.
Сравнивая базовый вариант Serverless с вариантом Serverless Lightning Engine, вы заметите, что Lightning Engine выполняет группировку, агрегацию и объединение данных быстрее, используя собственный слой выполнения на C++ и векторизованную обработку на бэкэнде, без каких-либо изменений в коде вашего приложения PySpark.
Lightning Engine оптимизирован для повышения производительности при больших объемах рабочей нагрузки. В этом примере мы используем небольшой набор данных, поэтому прирост производительности не так значителен, как мог бы быть. На наборе данных объемом 10 ТБ в бенчмарках было показано улучшение производительности до 4,3 раз по сравнению с открытым исходным кодом Spark.
6. Сравните графы выполнения в пользовательском интерфейсе Spark.
Сокращение времени выполнения впечатляет, но давайте заглянем под капот и посмотрим, что на самом деле делает Spark во время выполнения запроса. Это можно сделать, изучив графики выполнения Spark UI для обеих задач.
- Откройте консоль Google Cloud в своем браузере.
- Перейдите в Dataproc > Пакеты .
- В списке вы увидите две группы: стандартную базовую версию и версию для уровня Premium.
- Щелкните по пакету данных уровня Premium, который вы запускали, затем щелкните «Просмотреть пользовательский интерфейс Spark» , а затем «Просмотреть подробности» .
- В пользовательском интерфейсе Spark перейдите на вкладку «Задания» .
- В разделе «Выполненные задания» в поле поиска введите
Velox. - Вы часто будете встречать описания заданий, в которых упоминается
VeloxSparkPlanExecApi. Это относится к собственному механизму выполнения Velox, используемому Lightning Engine.
Теперь повторите этот процесс для стандартного уровня:
- Вернитесь на страницу «Бессерверные вычисления для Apache Spark пакетной обработки».
- Щелкните ссылку для пакета стандартного уровня , затем щелкните «Просмотреть пользовательский интерфейс Spark» , а затем «Просмотреть подробности» .
- В пользовательском интерфейсе Spark перейдите на вкладку «Задания» .
- В разделе «Выполненные задания» в поле поиска введите
Velox. - В описаниях вакансий вы не найдете упоминания API Velox.
7. Уборка
Чтобы избежать дальнейших списаний средств с вашего аккаунта Google Cloud, удалите ресурсы, созданные в ходе этого практического занятия.
В Cloud Shell удалите сегмент Cloud Storage и его содержимое:
gcloud storage rm -r gs://${BUCKET_NAME}
Удалите локальную копию файла benchmark.py :
rm benchmark.py
8. Поздравляем!
Поздравляем! Вы успешно создали среду для сравнительного анализа производительности Apache Spark и сравнили Serverless для Apache Spark Standard с Serverless для Apache Spark Premium.
Вы воочию убедились, как включение бессерверной архитектуры для нового механизма Lightning Engine в Apache Spark может сократить время выполнения вашей рабочей нагрузки Spark, и изучили пользовательский интерфейс Spark, чтобы увидеть, как физический граф выполнения преобразуется в нативный код C++ с помощью механизма Native Query Engine.
Что вы узнали
- Как написать скрипт для сравнительного анализа производительности набора данных PySpark.
- Как отправлять задания Spark в Serverless для Apache Spark.
- Как включить Lightning Engine.
- Как сравнивать планы заданий в пользовательском интерфейсе Spark.