使用 Serverless for Apache Spark 和 Lightning Engine 加速 Spark

1. 简介

在此 Codelab 中,您将探索 Google Cloud Serverless for Apache Spark 的原生执行引擎 Lightning Engine 的性能优势,并了解它如何优化 Serverless for Apache Spark 上的 Spark 工作负载。

Lightning Engine 使用 VeloxApache Gluten。Velox 是一款用于数据处理的高性能 C++ 引擎。Apache Gluten 是一个中间层,负责将基于 JVM 的 Spark 作业转换为可由 Velox 执行的 C++ 代码。

此演示使用 TPC-DS,这是一种旨在评估决策支持系统性能的行业标准基准。您将提交一个基准 PySpark 作业,以使用标准无服务器层级查询 TPC-DS 示例数据集。然后,您将使用启用 Lightning Engine 的高级层级运行完全相同的作业。最后,您将比较执行时间,并深入了解 Spark 界面,直观呈现硬件加速的 Spark 执行图表的差异。

假设您按照清理部分中的说明及时清理资源,则运行此 Codelab 的估计费用不到 1.00 美元

您将执行的操作

  • 创建一个 Cloud Storage 存储分区,用于存储基准脚本和结果
  • 使用 Serverless for Apache Spark 标准层执行基准 PySpark 数据处理作业
  • 使用 Serverless for Apache Spark 高级层和 Lightning Engine 执行同一作业
  • 比较运行时指标
  • 启动 Spark 历史记录服务器界面,比较原生物理执行图

所需条件

2. 准备工作

创建 Google Cloud 项目

  1. Google Cloud 控制台的项目选择器页面上,选择或创建一个 Google Cloud 项目
  2. 确保您的 Cloud 项目已启用结算功能。了解如何检查项目是否已启用结算功能

启动 Cloud Shell

Cloud Shell 是在 Google Cloud 中运行的命令行环境,预加载了必要的工具。

  1. 点击 Google Cloud 控制台顶部的激活 Cloud Shell
  2. 连接到 Cloud Shell 后,验证您的身份验证:
    gcloud auth list
    
  3. 确认您的项目已配置:
    gcloud config get project
    
  4. 如果项目未按预期设置,请进行设置:
    export PROJECT_ID=<YOUR_PROJECT_ID>
    gcloud config set project $PROJECT_ID
    

启用 API

运行以下命令可为此 Codelab 启用所有必需的 API:

gcloud services enable \
    dataproc.googleapis.com \
    storage.googleapis.com \
    compute.googleapis.com

3. 准备环境

在此步骤中,您将初始化环境变量并创建 Cloud Storage 存储分区。此存储分区将用于存放您提交到两个 Serverless for Apache Spark 层级的 PySpark 脚本。

设置环境变量

在 Cloud Shell 中运行以下命令,以设置默认环境变量。我们将使用 us-central1 区域,但您可以根据需要更改此设置。

export PROJECT_ID=$(gcloud config get-value project)
export REGION="us-central1"
export BUCKET_NAME="spark-benchmark-${PROJECT_ID}-${REGION}"

gcloud config set dataproc/region ${REGION}

创建 Cloud Storage 存储桶

创建用于存放脚本和日志的存储分区:

gcloud storage buckets create gs://${BUCKET_NAME} \
    --uniform-bucket-level-access \
    --location=${REGION}

将 TPC-DS 数据集复制到您自己的存储分区

在此步骤中,您将把 TPC-DS 数据集从公共存储分区复制到您自己的 Cloud Storage 存储分区。这可确保您的 PySpark 作业能够从您的项目中读取本地数据。

设置环境变量以选择数据集大小和类型:

export DATASET_TYPE="partitioned" # Options: partitioned, nonpartitioned
export DATASET_SIZE="1GB"         # Options: 1GB, 10GB, 100GB, 1000GB (1000GB not available for partitioned)

export SRC_PATH="gs://beam-tpcds/datasets/parquet/${DATASET_TYPE}/${DATASET_SIZE}"
export DATASET_PATH="gs://${BUCKET_NAME}/tpc-ds-dataset/${DATASET_TYPE}/${DATASET_SIZE}"

将 TPC-DS 数据复制到您自己的存储分区中:

gcloud storage cp -r ${SRC_PATH}/* ${DATASET_PATH}/

创建 PySpark 基准脚本

我们将使用一个 PySpark 脚本,该脚本会注册 Cloud Storage 存储分区中的标准 TPC-DS 表,并执行来自 Apache Spark 公共代码库的 5 个标准查询。该脚本接受数据集的路径作为实参。

在 Cloud Shell 中创建一个名为 benchmark.py 的文件。您可以复制并粘贴以下命令来生成文件:

cat << 'EOF' > benchmark.py
import argparse
import sys
from pyspark.sql import SparkSession
import time

def main():
    parser = argparse.ArgumentParser(description='TPC-DS Benchmark')
    parser.add_argument('data_path', help='GCS base path for TPC-DS tables')
    args = parser.parse_args()

    base_path = args.data_path

    # Initialize Spark Session
    spark = SparkSession.builder \
        .appName("TPC-DS Benchmark") \
        .getOrCreate()

    print(f"Spark Session created. Registering TPC-DS tables from {base_path}...")

    # List of all 24 TPC-DS tables
    tables = [
        "call_center", "catalog_page", "catalog_returns", "catalog_sales",
        "customer", "customer_address", "customer_demographics", "date_dim",
        "household_demographics", "income_band", "inventory", "item",
        "promotion", "reason", "ship_mode", "store", "store_returns",
        "store_sales", "time_dim", "warehouse", "web_page", "web_returns",
        "web_sales", "web_site"
    ]

    # Register each table as a temporary view
    # For this subset of queries, not every table is used
    for table in tables:
        path = f"{base_path}/{table}"
        try:
            df = spark.read.parquet(path)
            df.createOrReplaceTempView(table)
        except Exception as e:
            print(f"Warning: Could not load table {table} from {path}. Error: {e}")

    print("Tables registered successfully. Starting benchmark queries from Apache Spark test suite...")

    # Standard TPC-DS Queries sourced from Apache Spark public repository:
    # https://github.com/apache/spark/tree/master/sql/core/src/test/resources/tpcds
    queries = {
        "Q1": """
            WITH customer_total_return AS (
              SELECT sr_customer_sk AS ctr_customer_sk,
                     sr_store_sk AS ctr_store_sk,
                     sum(sr_return_amt) AS ctr_total_return
              FROM store_returns, date_dim
              WHERE sr_returned_date_sk = d_date_sk
                AND d_year = 2000
              GROUP BY sr_customer_sk, sr_store_sk
            )
            SELECT c_customer_id
            FROM customer_total_return ctr1, store, customer
            WHERE ctr1.ctr_total_return > (
              SELECT avg(ctr_total_return) * 1.2
              FROM customer_total_return ctr2
              WHERE ctr1.ctr_store_sk = ctr2.ctr_store_sk
            )
              AND s_store_sk = ctr1.ctr_store_sk
              AND s_state = 'TN'
              AND ctr1.ctr_customer_sk = c_customer_sk
            ORDER BY c_customer_id
            LIMIT 100
        """,
        "Q2": """
            WITH wscs AS (
              SELECT sold_date_sk, sales_price
              FROM (
                SELECT ws_sold_date_sk AS sold_date_sk, ws_ext_sales_price AS sales_price
                FROM web_sales
                UNION ALL
                SELECT cs_sold_date_sk AS sold_date_sk, cs_ext_sales_price AS sales_price
                FROM catalog_sales
              )
            ),
            wswscs AS (
              SELECT d_week_seq,
                     sum(CASE WHEN (d_day_name='Sunday') THEN sales_price ELSE null END) AS sun_sales,
                     sum(CASE WHEN (d_day_name='Monday') THEN sales_price ELSE null END) AS mon_sales,
                     sum(CASE WHEN (d_day_name='Tuesday') THEN sales_price ELSE null END) AS tue_sales,
                     sum(CASE WHEN (d_day_name='Wednesday') THEN sales_price ELSE null END) AS wed_sales,
                     sum(CASE WHEN (d_day_name='Thursday') THEN sales_price ELSE null END) AS thu_sales,
                     sum(CASE WHEN (d_day_name='Friday') THEN sales_price ELSE null END) AS fri_sales,
                     sum(CASE WHEN (d_day_name='Saturday') THEN sales_price ELSE null END) AS sat_sales
              FROM wscs, date_dim
              WHERE d_date_sk = sold_date_sk
              GROUP BY d_week_seq
            )
            SELECT d_week_seq1,
                   round(sun_sales1/sun_sales2, 2),
                   round(mon_sales1/mon_sales2, 2),
                   round(tue_sales1/tue_sales2, 2),
                   round(wed_sales1/wed_sales2, 2),
                   round(thu_sales1/thu_sales2, 2),
                   round(fri_sales1/fri_sales2, 2),
                   round(sat_sales1/sat_sales2, 2)
            FROM (
              SELECT wswscs.d_week_seq AS d_week_seq1,
                     sun_sales AS sun_sales1, mon_sales AS mon_sales1,
                     tue_sales AS tue_sales1, wed_sales AS wed_sales1,
                     thu_sales AS thu_sales1, fri_sales AS fri_sales1,
                     sat_sales AS sat_sales1
              FROM wswscs, date_dim
              WHERE date_dim.d_week_seq = wswscs.d_week_seq
                AND d_year = 2001
            ) y,
            (
              SELECT wswscs.d_week_seq AS d_week_seq2,
                     sun_sales AS sun_sales2, mon_sales AS mon_sales2,
                     tue_sales AS tue_sales2, wed_sales AS wed_sales2,
                     thu_sales AS thu_sales2, fri_sales AS fri_sales2,
                     sat_sales AS sat_sales2
              FROM wswscs, date_dim
              WHERE date_dim.d_week_seq = wswscs.d_week_seq
                AND d_year = 2001 + 1
            ) z
            WHERE d_week_seq1 = d_week_seq2 - 53
            ORDER BY d_week_seq1
        """,
        "Q3": """
            SELECT dt.d_year,
                   item.i_brand_id AS brand_id,
                   item.i_brand AS brand,
                   sum(ss_ext_sales_price) AS sum_agg
            FROM date_dim dt,
                 store_sales,
                 item
            WHERE dt.d_date_sk = store_sales.ss_sold_date_sk
              AND store_sales.ss_item_sk = item.i_item_sk
              AND item.i_manufact_id = 436
              AND dt.d_moy = 12
            GROUP BY dt.d_year,
                     item.i_brand,
                     item.i_brand_id
            ORDER BY dt.d_year,
                     sum_agg DESC,
                     brand_id
            LIMIT 100
        """,
        "Q7": """
            SELECT i_item_id,
                   avg(ss_quantity) AS agg1,
                   avg(ss_list_price) AS agg2,
                   avg(ss_coupon_amt) AS agg3,
                   avg(ss_sales_price) AS agg4
            FROM store_sales,
                 customer_demographics,
                 date_dim,
                 item,
                 promotion
            WHERE ss_sold_date_sk = d_date_sk
              AND ss_item_sk = i_item_sk
              AND ss_cdemo_sk = cd_demo_sk
              AND ss_promo_sk = p_promo_sk
              AND cd_gender = 'M'
              AND cd_marital_status = 'S'
              AND cd_education_status = 'College'
              AND (p_channel_email = 'N' OR p_channel_event = 'N')
              AND d_year = 2000
            GROUP BY i_item_id
            ORDER BY i_item_id
            LIMIT 100
        """,
        "Q19": """
            SELECT i_item_id,
                   i_brand,
                   i_category,
                   i_class,
                   i_manufact,
                   sum(ss_ext_sales_price) AS sales,
                   sum(ss_net_profit) AS profit
            FROM date_dim,
                 store_sales,
                 item,
                 customer,
                 store
            WHERE d_date_sk = ss_sold_date_sk
              AND i_item_sk = ss_item_sk
              AND d_year = 2000
              AND d_moy = 12
              AND c_customer_sk = ss_customer_sk
              AND s_store_sk = ss_store_sk
              AND i_manager_id = 9
            GROUP BY i_item_id,
                     i_brand,
                     i_category,
                     i_class,
                     i_manufact
            ORDER BY i_item_id,
                     i_brand,
                     i_category,
                     i_class,
                     i_manufact
            LIMIT 100
        """
    }

    total_start_time = time.time()

    for query_name, query_sql in queries.items():
        print(f"\nExecuting {query_name}...")
        query_start = time.time()

        # Execute query and force action using show()
        result_df = spark.sql(query_sql)
        result_df.show(5) # Show top 5 rows

        query_end = time.time()
        print(f"{query_name} completed in {query_end - query_start:.2f} seconds.")

    total_end_time = time.time()
    print(f"\nAll benchmark queries completed in {total_end_time - total_start_time:.2f} seconds.")

    spark.stop()

if __name__ == "__main__":
    main()
EOF

将脚本复制到 Cloud Storage 存储分区,以便 Serverless for Apache Spark 可以访问该脚本:

gcloud storage cp benchmark.py gs://${BUCKET_NAME}/scripts/benchmark.py

4. 运行基准无服务器作业

为了提供不使用 Lightning Engine 的基准比较,请将您之前上传的 PySpark 基准测试作业提交到 Serverless for Apache Spark 标准层。我们将以实参的形式传递您复制的数据集的路径。

运行以下命令以执行批量作业:

gcloud dataproc batches submit pyspark \
    gs://${BUCKET_NAME}/scripts/benchmark.py \
    --region=${REGION} \
    --version=2.3 \
    --deps-bucket=gs://${BUCKET_NAME} \
    -- ${DATASET_PATH}

监控作业

作业执行期间,您会在 Cloud Shell 终端中看到 PySpark 日志流。Serverless for Apache Spark 正在分配容器、从 Cloud Storage 读取 TPC-DS Parquet 数据集,并执行复杂的 SQL 计划。

脚本完成后,观察控制台输出。您应该会看到每个已执行的标准查询的结果和时间,如下所示:

...
Executing Q1...
+-------------+
|c_customer_id|
+-------------+
...

Q1 completed in 18.52 seconds.
...

All benchmark queries completed in 110.94 seconds.

记下完成测试所用的总秒数。这是您的基准运行时长

5. 使用 Serverless Premium 和 Lightning Engine 运行

接下来,您将在 Serverless for Apache Spark 上运行完全相同的 Spark 作业,但使用高级层并启用 Google 的原生矢量化查询引擎 Lightning Engine

将基准作业提交到无服务器,并明确启用 Lightning Engine:

gcloud dataproc batches submit pyspark \
    gs://${BUCKET_NAME}/scripts/benchmark.py \
    --region=${REGION} \
    --version=2.3 \
    --deps-bucket=gs://${BUCKET_NAME} \
    --properties="dataproc.tier=premium,spark.dataproc.lightningEngine.runtime=native" \
    -- ${DATASET_PATH}

比较结果

等待作业完成并检查输出。您应该会看到相同的查询结果。仔细查看完成时间:

...
All benchmark queries completed in 64.24 seconds.

将基准无服务器运行与无服务器 Lightning Engine 运行进行比较,您会发现 Lightning Engine 通过利用原生 C++ 执行层和后端矢量化处理,更快地执行分组、聚合和联接,而无需对 PySpark 应用代码进行任何更改。

Lightning Engine 经过优化,可随着工作负载的增加而提升性能。在此示例中,我们使用的是小型数据集,因此性能提升幅度不如使用大型数据集时那么显著。在 10 TB 的数据集上,基准测试表明,与开源 Spark 相比,性能最多可提升 4.3 倍

6. 在 Spark 界面中比较执行图

运行时缩短效果令人印象深刻,但我们不妨深入了解 Spark 在查询执行期间实际执行的操作。您可以通过检查这两个作业的 Spark 界面执行图来实现此目的。

  1. 在浏览器中打开 Google Cloud 控制台
  2. 前往 Dataproc > 批处理
  3. 您会在列表中看到两个批次:标准基准测试和高级层级测试。
  4. 点击您运行的 Premium 级批处理,然后依次点击查看 Spark 界面查看详情
  5. 在 Spark 界面中,前往作业标签页。
  6. 已完成的作业下方的搜索框中,输入 Velox
  7. 您会看到许多包含 VeloxSparkPlanExecApi 的职位说明。指 Lightning Engine 使用的 Velox 原生执行引擎。

现在,针对标准层级运行重复此流程:

  1. 返回到“Serverless for Apache Spark 批次”页面。
  2. 点击标准层级批次的链接,然后依次点击查看 Spark 界面查看详情
  3. 在 Spark 界面中,前往作业标签页。
  4. 已完成的作业下方的搜索框中,输入 Velox
  5. 您不会在作业说明中看到任何有关 Velox API 的内容。

7. 清理

为避免系统向您的 Google Cloud 账号持续收取费用,请删除本 Codelab 中创建的资源。

在 Cloud Shell 中,删除 Cloud Storage 存储分区及其内容:

gcloud storage rm -r gs://${BUCKET_NAME}

删除 benchmark.py 的本地副本:

rm benchmark.py

8. 恭喜

恭喜!您已成功为 Apache Spark 构建基准比较环境,并将 Serverless for Apache Spark Standard 与 Serverless for Apache Spark Premium 进行了比较。

您亲眼看到了启用 Serverless for Apache Spark 的全新 Lightning Engine 如何缩短 Spark 工作负载的运行时长,还探索了 Spark 界面,了解了如何使用 Native Query Engine 将物理执行图转换为原生 C++ 代码。

您学到的内容

  • 如何编写 PySpark 数据集基准测试脚本。
  • 如何将 Spark 作业提交到 Serverless for Apache Spark。
  • 如何启用 Lightning Engine。
  • 如何在 Spark 界面中比较作业计划。

后续步骤