1. 簡介
在本程式碼研究室中,您將探索 Google Cloud Serverless for Apache Spark 原生執行引擎 Lightning Engine 的效能優勢,並瞭解該引擎如何最佳化 Serverless for Apache Spark 上的 Spark 工作負載。
Lightning Engine 使用 Velox 和 Apache Gluten。Velox 是用於資料處理的高效能 C++ 引擎。Apache Gluten 是中間層,負責將以 JVM 為基礎的 Spark 工作轉換為可由 Velox 執行的 C++ 程式碼。
本示範使用 TPC-DS,這是業界標準的基準,旨在評估決策支援系統的效能。您將提交基準 PySpark 工作,使用標準無伺服器層級查詢範例 TPC-DS 資料集。接著,您會使用啟用 Lightning Engine 的 Premium 層級,執行完全相同的工作。最後,您會比較執行時間,並深入瞭解 Spark UI,以視覺化方式呈現硬體加速 Spark 執行圖表的差異。
假設您按照「清理」一節所述,及時清理資源,執行本程式碼研究室的預估費用不到 $1.00 美元。
學習內容
- 建立 Cloud Storage bucket,儲存基準測試指令碼和結果
- 使用 Serverless for Apache Spark Standard 層級執行基準 PySpark 資料處理工作
- 使用 Serverless for Apache Spark Premium 層級和 Lightning Engine 執行相同工作
- 比較執行階段指標
- 啟動 Spark 記錄伺服器 UI,比較原生實體執行圖
軟硬體需求
- 網路瀏覽器,例如 Chrome
- 已啟用計費功能的 Google Cloud 專案
- 熟悉 Apache Spark 和 Linux 指令列的基本操作
2. 事前準備
建立 Google Cloud 專案
- 在 Google Cloud 控制台的專案選取器頁面中,選取或建立 Google Cloud 專案。
- 確認 Cloud 專案已啟用計費功能。瞭解如何檢查專案是否已啟用計費功能。
啟動 Cloud Shell
Cloud Shell 是在 Google Cloud 中運作的指令列環境,已預先載入必要工具。
- 點選 Google Cloud 控制台頂端的「啟用 Cloud Shell」。
- 連至 Cloud Shell 後,請驗證您的驗證:
gcloud auth list - 確認專案已設定完成:
gcloud config get project - 如果專案未如預期設定,請設定專案:
export PROJECT_ID=<YOUR_PROJECT_ID> gcloud config set project $PROJECT_ID
啟用 API
執行下列指令,啟用本程式碼研究室的所有必要 API:
gcloud services enable \
dataproc.googleapis.com \
storage.googleapis.com \
compute.googleapis.com
3. 準備環境
在這個步驟中,您將初始化環境變數並建立 Cloud Storage bucket。這個 bucket 會存放您提交至兩個 Serverless for Apache Spark 層級的 PySpark 指令碼。
設定環境變數
在 Cloud Shell 執行下列指令,設定預設環境變數。我們將使用 us-central1 區域,但您可以視需要變更。
export PROJECT_ID=$(gcloud config get-value project)
export REGION="us-central1"
export BUCKET_NAME="spark-benchmark-${PROJECT_ID}-${REGION}"
gcloud config set dataproc/region ${REGION}
建立 Cloud Storage bucket
建立 bucket 來存放指令碼和記錄:
gcloud storage buckets create gs://${BUCKET_NAME} \
--uniform-bucket-level-access \
--location=${REGION}
將 TPC-DS 資料集複製到自己的值區
在這個步驟中,您會將 TPC-DS 資料集從公開 bucket 複製到自己的 Cloud Storage bucket。確保 PySpark 工作可從專案本機讀取資料。
設定環境變數,選擇資料集大小和類型:
export DATASET_TYPE="partitioned" # Options: partitioned, nonpartitioned
export DATASET_SIZE="1GB" # Options: 1GB, 10GB, 100GB, 1000GB (1000GB not available for partitioned)
export SRC_PATH="gs://beam-tpcds/datasets/parquet/${DATASET_TYPE}/${DATASET_SIZE}"
export DATASET_PATH="gs://${BUCKET_NAME}/tpc-ds-dataset/${DATASET_TYPE}/${DATASET_SIZE}"
將 TPC-DS 資料複製到自己的 bucket:
gcloud storage cp -r ${SRC_PATH}/* ${DATASET_PATH}/
建立 PySpark 基準指令碼
我們將使用 PySpark 指令碼,從 Cloud Storage 值區註冊標準 TPC-DS 資料表,並執行從 Apache Spark 公開存放區取得的 5 個標準查詢。指令碼會將資料集路徑做為引數。
在 Cloud Shell 中建立名為 benchmark.py 的檔案。您可以複製並貼上下列指令來產生檔案:
cat << 'EOF' > benchmark.py
import argparse
import sys
from pyspark.sql import SparkSession
import time
def main():
parser = argparse.ArgumentParser(description='TPC-DS Benchmark')
parser.add_argument('data_path', help='GCS base path for TPC-DS tables')
args = parser.parse_args()
base_path = args.data_path
# Initialize Spark Session
spark = SparkSession.builder \
.appName("TPC-DS Benchmark") \
.getOrCreate()
print(f"Spark Session created. Registering TPC-DS tables from {base_path}...")
# List of all 24 TPC-DS tables
tables = [
"call_center", "catalog_page", "catalog_returns", "catalog_sales",
"customer", "customer_address", "customer_demographics", "date_dim",
"household_demographics", "income_band", "inventory", "item",
"promotion", "reason", "ship_mode", "store", "store_returns",
"store_sales", "time_dim", "warehouse", "web_page", "web_returns",
"web_sales", "web_site"
]
# Register each table as a temporary view
# For this subset of queries, not every table is used
for table in tables:
path = f"{base_path}/{table}"
try:
df = spark.read.parquet(path)
df.createOrReplaceTempView(table)
except Exception as e:
print(f"Warning: Could not load table {table} from {path}. Error: {e}")
print("Tables registered successfully. Starting benchmark queries from Apache Spark test suite...")
# Standard TPC-DS Queries sourced from Apache Spark public repository:
# https://github.com/apache/spark/tree/master/sql/core/src/test/resources/tpcds
queries = {
"Q1": """
WITH customer_total_return AS (
SELECT sr_customer_sk AS ctr_customer_sk,
sr_store_sk AS ctr_store_sk,
sum(sr_return_amt) AS ctr_total_return
FROM store_returns, date_dim
WHERE sr_returned_date_sk = d_date_sk
AND d_year = 2000
GROUP BY sr_customer_sk, sr_store_sk
)
SELECT c_customer_id
FROM customer_total_return ctr1, store, customer
WHERE ctr1.ctr_total_return > (
SELECT avg(ctr_total_return) * 1.2
FROM customer_total_return ctr2
WHERE ctr1.ctr_store_sk = ctr2.ctr_store_sk
)
AND s_store_sk = ctr1.ctr_store_sk
AND s_state = 'TN'
AND ctr1.ctr_customer_sk = c_customer_sk
ORDER BY c_customer_id
LIMIT 100
""",
"Q2": """
WITH wscs AS (
SELECT sold_date_sk, sales_price
FROM (
SELECT ws_sold_date_sk AS sold_date_sk, ws_ext_sales_price AS sales_price
FROM web_sales
UNION ALL
SELECT cs_sold_date_sk AS sold_date_sk, cs_ext_sales_price AS sales_price
FROM catalog_sales
)
),
wswscs AS (
SELECT d_week_seq,
sum(CASE WHEN (d_day_name='Sunday') THEN sales_price ELSE null END) AS sun_sales,
sum(CASE WHEN (d_day_name='Monday') THEN sales_price ELSE null END) AS mon_sales,
sum(CASE WHEN (d_day_name='Tuesday') THEN sales_price ELSE null END) AS tue_sales,
sum(CASE WHEN (d_day_name='Wednesday') THEN sales_price ELSE null END) AS wed_sales,
sum(CASE WHEN (d_day_name='Thursday') THEN sales_price ELSE null END) AS thu_sales,
sum(CASE WHEN (d_day_name='Friday') THEN sales_price ELSE null END) AS fri_sales,
sum(CASE WHEN (d_day_name='Saturday') THEN sales_price ELSE null END) AS sat_sales
FROM wscs, date_dim
WHERE d_date_sk = sold_date_sk
GROUP BY d_week_seq
)
SELECT d_week_seq1,
round(sun_sales1/sun_sales2, 2),
round(mon_sales1/mon_sales2, 2),
round(tue_sales1/tue_sales2, 2),
round(wed_sales1/wed_sales2, 2),
round(thu_sales1/thu_sales2, 2),
round(fri_sales1/fri_sales2, 2),
round(sat_sales1/sat_sales2, 2)
FROM (
SELECT wswscs.d_week_seq AS d_week_seq1,
sun_sales AS sun_sales1, mon_sales AS mon_sales1,
tue_sales AS tue_sales1, wed_sales AS wed_sales1,
thu_sales AS thu_sales1, fri_sales AS fri_sales1,
sat_sales AS sat_sales1
FROM wswscs, date_dim
WHERE date_dim.d_week_seq = wswscs.d_week_seq
AND d_year = 2001
) y,
(
SELECT wswscs.d_week_seq AS d_week_seq2,
sun_sales AS sun_sales2, mon_sales AS mon_sales2,
tue_sales AS tue_sales2, wed_sales AS wed_sales2,
thu_sales AS thu_sales2, fri_sales AS fri_sales2,
sat_sales AS sat_sales2
FROM wswscs, date_dim
WHERE date_dim.d_week_seq = wswscs.d_week_seq
AND d_year = 2001 + 1
) z
WHERE d_week_seq1 = d_week_seq2 - 53
ORDER BY d_week_seq1
""",
"Q3": """
SELECT dt.d_year,
item.i_brand_id AS brand_id,
item.i_brand AS brand,
sum(ss_ext_sales_price) AS sum_agg
FROM date_dim dt,
store_sales,
item
WHERE dt.d_date_sk = store_sales.ss_sold_date_sk
AND store_sales.ss_item_sk = item.i_item_sk
AND item.i_manufact_id = 436
AND dt.d_moy = 12
GROUP BY dt.d_year,
item.i_brand,
item.i_brand_id
ORDER BY dt.d_year,
sum_agg DESC,
brand_id
LIMIT 100
""",
"Q7": """
SELECT i_item_id,
avg(ss_quantity) AS agg1,
avg(ss_list_price) AS agg2,
avg(ss_coupon_amt) AS agg3,
avg(ss_sales_price) AS agg4
FROM store_sales,
customer_demographics,
date_dim,
item,
promotion
WHERE ss_sold_date_sk = d_date_sk
AND ss_item_sk = i_item_sk
AND ss_cdemo_sk = cd_demo_sk
AND ss_promo_sk = p_promo_sk
AND cd_gender = 'M'
AND cd_marital_status = 'S'
AND cd_education_status = 'College'
AND (p_channel_email = 'N' OR p_channel_event = 'N')
AND d_year = 2000
GROUP BY i_item_id
ORDER BY i_item_id
LIMIT 100
""",
"Q19": """
SELECT i_item_id,
i_brand,
i_category,
i_class,
i_manufact,
sum(ss_ext_sales_price) AS sales,
sum(ss_net_profit) AS profit
FROM date_dim,
store_sales,
item,
customer,
store
WHERE d_date_sk = ss_sold_date_sk
AND i_item_sk = ss_item_sk
AND d_year = 2000
AND d_moy = 12
AND c_customer_sk = ss_customer_sk
AND s_store_sk = ss_store_sk
AND i_manager_id = 9
GROUP BY i_item_id,
i_brand,
i_category,
i_class,
i_manufact
ORDER BY i_item_id,
i_brand,
i_category,
i_class,
i_manufact
LIMIT 100
"""
}
total_start_time = time.time()
for query_name, query_sql in queries.items():
print(f"\nExecuting {query_name}...")
query_start = time.time()
# Execute query and force action using show()
result_df = spark.sql(query_sql)
result_df.show(5) # Show top 5 rows
query_end = time.time()
print(f"{query_name} completed in {query_end - query_start:.2f} seconds.")
total_end_time = time.time()
print(f"\nAll benchmark queries completed in {total_end_time - total_start_time:.2f} seconds.")
spark.stop()
if __name__ == "__main__":
main()
EOF
將指令碼複製到 Cloud Storage 值區,讓 Serverless for Apache Spark 存取:
gcloud storage cp benchmark.py gs://${BUCKET_NAME}/scripts/benchmark.py
4. 執行基準無伺服器作業
如要提供不含 Lightning Engine 的基準比較,請將先前上傳的 PySpark 基準化工作提交至 Serverless for Apache Spark Standard 層級。我們會將您複製的資料集路徑做為引數傳遞。
執行下列指令來執行批次工作:
gcloud dataproc batches submit pyspark \
gs://${BUCKET_NAME}/scripts/benchmark.py \
--region=${REGION} \
--version=2.3 \
--deps-bucket=gs://${BUCKET_NAME} \
-- ${DATASET_PATH}
監控工作
工作執行期間,您會在 Cloud Shell 終端機中看到 PySpark 記錄串流。Serverless for Apache Spark 會分配容器、從 Cloud Storage 讀取 TPC-DS Parquet 資料集,並執行複雜的 SQL 計畫。
指令碼執行完畢後,請觀察控制台輸出內容。您應該會看到每個執行的標準查詢結果和時間,類似於:
... Executing Q1... +-------------+ |c_customer_id| +-------------+ ... Q1 completed in 18.52 seconds. ... All benchmark queries completed in 110.94 seconds.
請記下完成作業的總秒數。這是您的基準執行階段。
5. 使用 Serverless Premium 和 Lightning Engine 執行
接著,您會在 Serverless for Apache Spark 上執行完全相同的 Spark 工作,但會使用進階級,並啟用 Google 的原生向量化查詢引擎 Lightning Engine。
提交基準工作至 Serverless,並明確啟用 Lightning Engine:
gcloud dataproc batches submit pyspark \
gs://${BUCKET_NAME}/scripts/benchmark.py \
--region=${REGION} \
--version=2.3 \
--deps-bucket=gs://${BUCKET_NAME} \
--properties="dataproc.tier=premium,spark.dataproc.lightningEngine.runtime=native" \
-- ${DATASET_PATH}
比較結果
等待工作完成並檢查輸出內容。您應該會看到相同的查詢結果。請仔細查看完成時間:
... All benchmark queries completed in 64.24 seconds.
比較基準無伺服器執行作業與無伺服器 Lightning Engine 執行作業後,您會發現 Lightning Engine 運用原生 C++ 執行層和後端的向量化處理,可更快執行分組、彙整和聯結作業,而且完全不需要變更 PySpark 應用程式程式碼。
工作負載越大,Lightning Engine 的效能就越好。在本例中,我們使用的是小型資料集,因此效能提升幅度可能不如預期。在 10 TB 的資料集上,基準測試顯示效能較開放原始碼 Spark 提升 4.3 倍。
6. 在 Spark UI 中比較執行圖
執行時間縮短的幅度令人驚豔,但讓我們深入瞭解 Spark 在查詢執行期間實際執行的作業。您可以檢查這兩項工作的 Spark UI 執行圖表,瞭解這點。
- 在瀏覽器中開啟 Google Cloud 控制台。
- 依序前往「Dataproc」>「Batches」(批次)。
- 清單中會顯示兩批資料:標準基準執行作業和進階級執行作業。
- 按一下您執行的 Premium 層級批次,然後依序點選「View Spark UI」(查看 Spark UI) 和「View Details」(查看詳細資料)。
- 在 Spark UI 中,前往「Jobs」分頁。
- 在「已完成的工作」下方的搜尋框中輸入
Velox。 - 您會看到許多包含
VeloxSparkPlanExecApi的職位說明。這是指 Lightning Engine 使用的 Velox 原生執行引擎。
現在,請針對標準層級執行重複這個程序:
- 返回 Serverless for Apache Spark Batches 頁面。
- 按一下「Standard tier」(標準層級) 批次的連結,然後依序點選「View Spark UI」(查看 Spark UI) 和「View Details」(查看詳細資料)。
- 在 Spark UI 中,前往「Jobs」分頁。
- 在「已完成的工作」下方的搜尋框中輸入
Velox。 - 工作說明中不會提及 Velox API。
7. 清理
如要避免系統持續向您的 Google Cloud 帳戶收費,請刪除本程式碼研究室建立的資源。
在 Cloud Shell 中,刪除 Cloud Storage bucket 和當中內容:
gcloud storage rm -r gs://${BUCKET_NAME}
刪除 benchmark.py 的本機副本:
rm benchmark.py
8. 恭喜
恭喜!您已成功建構 Apache Spark 的基準化環境,並比較 Serverless for Apache Spark Standard 與 Serverless for Apache Spark Premium。
您親眼見證啟用 Serverless for Apache Spark 的全新 Lightning Engine 後,Spark 工作負載的執行時間如何縮短,並探索 Spark UI,瞭解實體執行圖如何使用 Native Query Engine 轉換為原生 C++ 程式碼。
目前所學內容
- 如何編寫 PySpark 資料集基準測試指令碼。
- 如何將 Spark 工作提交至 Serverless for Apache Spark。
- 如何啟用 Lightning Engine。
- 如何在 Spark UI 中比較工作方案。