1. Giới thiệu
Trong lớp học lập trình này, bạn sẽ khám phá những lợi ích về hiệu suất của Google Cloud Serverless cho Apache Spark's công cụ thực thi gốc, Lightning Engine, và xem xét cách công cụ này tối ưu hoá khối lượng công việc Spark trên Serverless cho Apache Spark.
Lightning Engine sử dụng Velox và Apache Gluten. Velox là một công cụ C++ hiệu suất cao để xử lý dữ liệu. Apache Gluten là một lớp trung gian chịu trách nhiệm chuyển đổi các công việc Spark dựa trên JVM thành mã C++ mà Velox có thể thực thi.
Bản minh hoạ này sử dụng TPC-DS, một điểm chuẩn tiêu chuẩn trong ngành được thiết kế để đánh giá hiệu suất của các hệ thống hỗ trợ quyết định. Bạn sẽ gửi một công việc PySpark cơ sở để truy vấn một tập dữ liệu TPC-DS mẫu bằng tầng Không máy chủ tiêu chuẩn. Sau đó, bạn sẽ chạy chính công việc đó bằng tầng Cao cấp khi bật Lightning Engine. Cuối cùng, bạn sẽ so sánh thời gian thực thi và đi sâu vào Giao diện người dùng Spark để hình dung sự khác biệt trong các biểu đồ thực thi Spark được tăng tốc bằng phần cứng.
Chi phí ước tính để chạy lớp học lập trình này là dưới 1,00 USD, giả sử các tài nguyên được dọn dẹp kịp thời như mô tả trong phần Dọn dẹp.
Bạn sẽ thực hiện
- Tạo một bộ chứa Cloud Storage để lưu trữ các tập lệnh và kết quả đo điểm chuẩn
- Thực thi một công việc xử lý dữ liệu PySpark cơ sở bằng tầng Không máy chủ tiêu chuẩn cho Apache Spark
- Thực thi cùng một công việc bằng tầng Không máy chủ cao cấp cho Apache Spark với Lightning Engine
- So sánh các chỉ số thời gian chạy
- Khởi chạy Giao diện người dùng máy chủ nhật ký Spark để so sánh các biểu đồ thực thi vật lý gốc
Bạn cần có
- Một trình duyệt web như Chrome
- Một dự án Google Cloud đã bật tính năng thanh toán
- Quen thuộc cơ bản với Apache Spark và dòng lệnh Linux
2. Trước khi bắt đầu
Tạo một dự án trên Google Cloud
- Trong Google Cloud Console, trên trang bộ chọn dự án, hãy chọn hoặc tạo một dự án Google Cloud.
- Đảm bảo rằng bạn đã bật tính năng thanh toán cho dự án trên đám mây của bạn. Tìm hiểu cách kiểm tra xem tính năng thanh toán đã được bật trên một dự án hay chưa.
Bắt đầu Cloud Shell
Cloud Shell là một môi trường dòng lệnh chạy trong Google Cloud, được tải sẵn các công cụ cần thiết.
- Nhấp vào Kích hoạt Cloud Shell ở đầu bảng điều khiển Cloud.
- Sau khi kết nối với Cloud Shell, hãy xác minh quá trình xác thực:
gcloud auth list - Xác nhận rằng dự án của bạn đã được định cấu hình:
gcloud config get project - Nếu dự án của bạn không được thiết lập như mong đợi, hãy thiết lập dự án:
export PROJECT_ID=<YOUR_PROJECT_ID> gcloud config set project $PROJECT_ID
Bật API
Chạy lệnh này để bật tất cả các API bắt buộc cho lớp học lập trình này:
gcloud services enable \
dataproc.googleapis.com \
storage.googleapis.com \
compute.googleapis.com
3. Chuẩn bị môi trường
Ở bước này, bạn sẽ khởi chạy các biến môi trường và tạo một bộ chứa Cloud Storage. Bộ chứa này sẽ chứa tập lệnh PySpark mà bạn gửi đến cả hai tầng Không máy chủ cho Apache Spark.
Đặt các biến môi trường
Chạy các lệnh sau trong Cloud Shell để đặt các biến môi trường mặc định. Chúng ta sẽ sử dụng khu vực us-central1, nhưng bạn có thể thay đổi nếu muốn.
export PROJECT_ID=$(gcloud config get-value project)
export REGION="us-central1"
export BUCKET_NAME="spark-benchmark-${PROJECT_ID}-${REGION}"
gcloud config set dataproc/region ${REGION}
Tạo một bộ chứa Cloud Storage
Tạo bộ chứa để lưu giữ các tập lệnh và nhật ký:
gcloud storage buckets create gs://${BUCKET_NAME} \
--uniform-bucket-level-access \
--location=${REGION}
Sao chép tập dữ liệu TPC-DS vào bộ chứa của riêng bạn
Ở bước này, bạn sẽ sao chép tập dữ liệu TPC-DS từ một bộ chứa công khai vào bộ chứa Cloud Storage của riêng bạn. Điều này đảm bảo rằng các công việc PySpark có thể đọc dữ liệu cục bộ từ dự án của bạn.
Đặt các biến môi trường để chọn kích thước và loại tập dữ liệu:
export DATASET_TYPE="partitioned" # Options: partitioned, nonpartitioned
export DATASET_SIZE="1GB" # Options: 1GB, 10GB, 100GB, 1000GB (1000GB not available for partitioned)
export SRC_PATH="gs://beam-tpcds/datasets/parquet/${DATASET_TYPE}/${DATASET_SIZE}"
export DATASET_PATH="gs://${BUCKET_NAME}/tpc-ds-dataset/${DATASET_TYPE}/${DATASET_SIZE}"
Sao chép dữ liệu TPC-DS vào bộ chứa của riêng bạn:
gcloud storage cp -r ${SRC_PATH}/* ${DATASET_PATH}/
Tạo tập lệnh đo điểm chuẩn PySpark
Chúng ta sẽ sử dụng một tập lệnh PySpark đăng ký các bảng TPC-DS tiêu chuẩn từ bộ chứa Cloud Storage của bạn và thực thi 5 truy vấn tiêu chuẩn có nguồn gốc từ kho lưu trữ công khai Apache Spark. Tập lệnh chấp nhận đường dẫn đến tập dữ liệu của bạn làm đối số.
Tạo một tệp có tên benchmark.py trong Cloud Shell. Bạn có thể sao chép và dán lệnh sau để tạo tệp:
cat << 'EOF' > benchmark.py
import argparse
import sys
from pyspark.sql import SparkSession
import time
def main():
parser = argparse.ArgumentParser(description='TPC-DS Benchmark')
parser.add_argument('data_path', help='GCS base path for TPC-DS tables')
args = parser.parse_args()
base_path = args.data_path
# Initialize Spark Session
spark = SparkSession.builder \
.appName("TPC-DS Benchmark") \
.getOrCreate()
print(f"Spark Session created. Registering TPC-DS tables from {base_path}...")
# List of all 24 TPC-DS tables
tables = [
"call_center", "catalog_page", "catalog_returns", "catalog_sales",
"customer", "customer_address", "customer_demographics", "date_dim",
"household_demographics", "income_band", "inventory", "item",
"promotion", "reason", "ship_mode", "store", "store_returns",
"store_sales", "time_dim", "warehouse", "web_page", "web_returns",
"web_sales", "web_site"
]
# Register each table as a temporary view
# For this subset of queries, not every table is used
for table in tables:
path = f"{base_path}/{table}"
try:
df = spark.read.parquet(path)
df.createOrReplaceTempView(table)
except Exception as e:
print(f"Warning: Could not load table {table} from {path}. Error: {e}")
print("Tables registered successfully. Starting benchmark queries from Apache Spark test suite...")
# Standard TPC-DS Queries sourced from Apache Spark public repository:
# https://github.com/apache/spark/tree/master/sql/core/src/test/resources/tpcds
queries = {
"Q1": """
WITH customer_total_return AS (
SELECT sr_customer_sk AS ctr_customer_sk,
sr_store_sk AS ctr_store_sk,
sum(sr_return_amt) AS ctr_total_return
FROM store_returns, date_dim
WHERE sr_returned_date_sk = d_date_sk
AND d_year = 2000
GROUP BY sr_customer_sk, sr_store_sk
)
SELECT c_customer_id
FROM customer_total_return ctr1, store, customer
WHERE ctr1.ctr_total_return > (
SELECT avg(ctr_total_return) * 1.2
FROM customer_total_return ctr2
WHERE ctr1.ctr_store_sk = ctr2.ctr_store_sk
)
AND s_store_sk = ctr1.ctr_store_sk
AND s_state = 'TN'
AND ctr1.ctr_customer_sk = c_customer_sk
ORDER BY c_customer_id
LIMIT 100
""",
"Q2": """
WITH wscs AS (
SELECT sold_date_sk, sales_price
FROM (
SELECT ws_sold_date_sk AS sold_date_sk, ws_ext_sales_price AS sales_price
FROM web_sales
UNION ALL
SELECT cs_sold_date_sk AS sold_date_sk, cs_ext_sales_price AS sales_price
FROM catalog_sales
)
),
wswscs AS (
SELECT d_week_seq,
sum(CASE WHEN (d_day_name='Sunday') THEN sales_price ELSE null END) AS sun_sales,
sum(CASE WHEN (d_day_name='Monday') THEN sales_price ELSE null END) AS mon_sales,
sum(CASE WHEN (d_day_name='Tuesday') THEN sales_price ELSE null END) AS tue_sales,
sum(CASE WHEN (d_day_name='Wednesday') THEN sales_price ELSE null END) AS wed_sales,
sum(CASE WHEN (d_day_name='Thursday') THEN sales_price ELSE null END) AS thu_sales,
sum(CASE WHEN (d_day_name='Friday') THEN sales_price ELSE null END) AS fri_sales,
sum(CASE WHEN (d_day_name='Saturday') THEN sales_price ELSE null END) AS sat_sales
FROM wscs, date_dim
WHERE d_date_sk = sold_date_sk
GROUP BY d_week_seq
)
SELECT d_week_seq1,
round(sun_sales1/sun_sales2, 2),
round(mon_sales1/mon_sales2, 2),
round(tue_sales1/tue_sales2, 2),
round(wed_sales1/wed_sales2, 2),
round(thu_sales1/thu_sales2, 2),
round(fri_sales1/fri_sales2, 2),
round(sat_sales1/sat_sales2, 2)
FROM (
SELECT wswscs.d_week_seq AS d_week_seq1,
sun_sales AS sun_sales1, mon_sales AS mon_sales1,
tue_sales AS tue_sales1, wed_sales AS wed_sales1,
thu_sales AS thu_sales1, fri_sales AS fri_sales1,
sat_sales AS sat_sales1
FROM wswscs, date_dim
WHERE date_dim.d_week_seq = wswscs.d_week_seq
AND d_year = 2001
) y,
(
SELECT wswscs.d_week_seq AS d_week_seq2,
sun_sales AS sun_sales2, mon_sales AS mon_sales2,
tue_sales AS tue_sales2, wed_sales AS wed_sales2,
thu_sales AS thu_sales2, fri_sales AS fri_sales2,
sat_sales AS sat_sales2
FROM wswscs, date_dim
WHERE date_dim.d_week_seq = wswscs.d_week_seq
AND d_year = 2001 + 1
) z
WHERE d_week_seq1 = d_week_seq2 - 53
ORDER BY d_week_seq1
""",
"Q3": """
SELECT dt.d_year,
item.i_brand_id AS brand_id,
item.i_brand AS brand,
sum(ss_ext_sales_price) AS sum_agg
FROM date_dim dt,
store_sales,
item
WHERE dt.d_date_sk = store_sales.ss_sold_date_sk
AND store_sales.ss_item_sk = item.i_item_sk
AND item.i_manufact_id = 436
AND dt.d_moy = 12
GROUP BY dt.d_year,
item.i_brand,
item.i_brand_id
ORDER BY dt.d_year,
sum_agg DESC,
brand_id
LIMIT 100
""",
"Q7": """
SELECT i_item_id,
avg(ss_quantity) AS agg1,
avg(ss_list_price) AS agg2,
avg(ss_coupon_amt) AS agg3,
avg(ss_sales_price) AS agg4
FROM store_sales,
customer_demographics,
date_dim,
item,
promotion
WHERE ss_sold_date_sk = d_date_sk
AND ss_item_sk = i_item_sk
AND ss_cdemo_sk = cd_demo_sk
AND ss_promo_sk = p_promo_sk
AND cd_gender = 'M'
AND cd_marital_status = 'S'
AND cd_education_status = 'College'
AND (p_channel_email = 'N' OR p_channel_event = 'N')
AND d_year = 2000
GROUP BY i_item_id
ORDER BY i_item_id
LIMIT 100
""",
"Q19": """
SELECT i_item_id,
i_brand,
i_category,
i_class,
i_manufact,
sum(ss_ext_sales_price) AS sales,
sum(ss_net_profit) AS profit
FROM date_dim,
store_sales,
item,
customer,
store
WHERE d_date_sk = ss_sold_date_sk
AND i_item_sk = ss_item_sk
AND d_year = 2000
AND d_moy = 12
AND c_customer_sk = ss_customer_sk
AND s_store_sk = ss_store_sk
AND i_manager_id = 9
GROUP BY i_item_id,
i_brand,
i_category,
i_class,
i_manufact
ORDER BY i_item_id,
i_brand,
i_category,
i_class,
i_manufact
LIMIT 100
"""
}
total_start_time = time.time()
for query_name, query_sql in queries.items():
print(f"\nExecuting {query_name}...")
query_start = time.time()
# Execute query and force action using show()
result_df = spark.sql(query_sql)
result_df.show(5) # Show top 5 rows
query_end = time.time()
print(f"{query_name} completed in {query_end - query_start:.2f} seconds.")
total_end_time = time.time()
print(f"\nAll benchmark queries completed in {total_end_time - total_start_time:.2f} seconds.")
spark.stop()
if __name__ == "__main__":
main()
EOF
Sao chép tập lệnh lên bộ chứa Cloud Storage để Không máy chủ cho Apache Spark có thể truy cập vào tập lệnh đó:
gcloud storage cp benchmark.py gs://${BUCKET_NAME}/scripts/benchmark.py
4. Chạy công việc Không máy chủ cơ sở
Để so sánh đường cơ sở mà không cần Lightning Engine, hãy gửi công việc đo điểm chuẩn PySpark mà bạn đã tải lên trước đó đến tầng Không máy chủ cho Apache Spark tiêu chuẩn. Chúng ta sẽ truyền đường dẫn đến tập dữ liệu mà bạn đã sao chép làm đối số.
Chạy lệnh sau để thực thi công việc hàng loạt:
gcloud dataproc batches submit pyspark \
gs://${BUCKET_NAME}/scripts/benchmark.py \
--region=${REGION} \
--version=2.3 \
--deps-bucket=gs://${BUCKET_NAME} \
-- ${DATASET_PATH}
Giám sát công việc
Trong khi công việc đang thực thi, bạn sẽ thấy nhật ký PySpark phát trực tuyến trong thiết bị đầu cuối Cloud Shell. Không máy chủ cho Apache Spark đang phân bổ các bộ chứa, đọc tập dữ liệu TPC-DS Parquet từ Cloud Storage và thực thi các kế hoạch SQL phức tạp.
Sau khi tập lệnh hoàn tất, hãy quan sát đầu ra của bảng điều khiển. Bạn sẽ thấy kết quả và thời gian cho từng truy vấn tiêu chuẩn đã thực thi, tương tự như:
... Executing Q1... +-------------+ |c_customer_id| +-------------+ ... Q1 completed in 18.52 seconds. ... All benchmark queries completed in 110.94 seconds.
Ghi lại tổng số giây để hoàn tất. Đây là thời gian chạy cơ sở của bạn.
5. Chạy với Không máy chủ cao cấp và Lightning Engine
Tiếp theo, bạn sẽ chạy chính công việc Spark đó trên Không máy chủ cho Apache Spark, nhưng sử dụng tầng Cao cấp và bật công cụ truy vấn được vectơ hoá, gốc của Google: Lightning Engine.
Gửi công việc đo điểm chuẩn đến Không máy chủ khi bật Lightning Engine một cách rõ ràng:
gcloud dataproc batches submit pyspark \
gs://${BUCKET_NAME}/scripts/benchmark.py \
--region=${REGION} \
--version=2.3 \
--deps-bucket=gs://${BUCKET_NAME} \
--properties="dataproc.tier=premium,spark.dataproc.lightningEngine.runtime=native" \
-- ${DATASET_PATH}
So sánh kết quả
Chờ công việc hoàn tất và kiểm tra đầu ra. Bạn sẽ thấy cùng kết quả truy vấn. Hãy xem kỹ thời gian hoàn thành:
... All benchmark queries completed in 64.24 seconds.
Khi so sánh lần chạy Không máy chủ cơ sở với lần chạy Lightning Engine Không máy chủ, bạn sẽ nhận thấy rằng Lightning Engine thực thi việc nhóm, tổng hợp và kết hợp nhanh hơn bằng cách sử dụng lớp thực thi C++ gốc và xử lý được vectơ hoá trên phần phụ trợ mà không yêu cầu bất kỳ thay đổi nào đối với mã xử lý ứng dụng PySpark.
Lightning Engine được tối ưu hoá để tăng hiệu suất khi khối lượng công việc càng lớn. Trong ví dụ này, chúng ta đang sử dụng một tập dữ liệu nhỏ, vì vậy, mức tăng hiệu suất không đáng kể như có thể. Trên tập dữ liệu 10 TB, điểm chuẩn cho thấy hiệu suất cải thiện lên đến 4,3 lần so với Spark nguồn mở.
6. So sánh biểu đồ thực thi trong Giao diện người dùng Spark
Việc giảm thời gian chạy là rất ấn tượng, nhưng hãy xem bên trong những gì Spark thực sự đang làm trong quá trình thực thi truy vấn. Bạn có thể thực hiện việc này bằng cách kiểm tra các biểu đồ thực thi của Giao diện người dùng Spark cho cả hai công việc.
- Mở bảng điều khiển Cloud trong trình duyệt.
- Chuyển đến Dataproc > Batches (Dataproc > Hàng loạt).
- Bạn sẽ thấy hai lô trong danh sách: lần chạy cơ sở tiêu chuẩn và lần chạy tầng Cao cấp.
- Nhấp vào lô tầng Cao cấp mà bạn đã chạy, sau đó nhấp vào View Spark UI (Xem giao diện người dùng Spark) rồi nhấp vào View Details (Xem thông tin chi tiết).
- Trong Giao diện người dùng Spark, hãy chuyển đến thẻ Jobs (Công việc).
- Trong phần Completed Jobs (Công việc đã hoàn tất), trong hộp tìm kiếm, hãy nhập
Velox. - Bạn sẽ thấy nhiều nội dung mô tả công việc có chứa
VeloxSparkPlanExecApi. Điều này đề cập đến công cụ thực thi gốc Velox mà Lightning Engine đang sử dụng.
Bây giờ, hãy lặp lại quy trình này cho lần chạy tầng Tiêu chuẩn:
- Quay lại trang Hàng loạt Không máy chủ cho Apache Spark.
- Nhấp vào đường liên kết cho lô tầng Tiêu chuẩn, sau đó nhấp vào View Spark UI (Xem giao diện người dùng Spark) rồi nhấp vào View Details (Xem thông tin chi tiết).
- Trong Giao diện người dùng Spark, hãy chuyển đến thẻ Jobs (Công việc).
- Trong phần Completed Jobs (Công việc đã hoàn tất), trong hộp tìm kiếm, hãy nhập
Velox. - Bạn sẽ không thấy đề cập đến API Velox trong nội dung mô tả công việc.
7. Dọn dẹp
Để tránh các khoản phí liên tục cho tài khoản Google Cloud, hãy xoá các tài nguyên được tạo trong lớp học lập trình này.
Trong Cloud Shell, hãy xoá bộ chứa Cloud Storage và nội dung của bộ chứa đó:
gcloud storage rm -r gs://${BUCKET_NAME}
Xoá bản sao cục bộ của benchmark.py:
rm benchmark.py
8. Xin chúc mừng
Xin chúc mừng! Bạn đã xây dựng thành công một môi trường đo điểm chuẩn cho Apache Spark và so sánh Không máy chủ cho Apache Spark tiêu chuẩn với Không máy chủ cho Apache Spark cao cấp.
Bạn đã trực tiếp thấy cách bật Lightning Engine mới của Không máy chủ cho Apache Spark có thể giảm thời gian chạy của khối lượng công việc Spark và bạn đã khám phá Giao diện người dùng Spark để xem cách biểu đồ thực thi vật lý được chuyển đổi thành mã C++ gốc bằng Công cụ truy vấn gốc.
Kiến thức bạn học được
- Cách viết tập lệnh đo điểm chuẩn tập dữ liệu PySpark.
- Cách gửi công việc Spark đến Không máy chủ cho Apache Spark.
- Cách bật Lightning Engine.
- Cách so sánh các kế hoạch công việc trong Giao diện người dùng Spark.
Các bước tiếp theo
- Khám phá Tài liệu về Không máy chủ cho Apache Spark
- Xem công cụ đủ điều kiện Thực thi truy vấn gốc
- Xem các truy vấn đo điểm chuẩn TPC-DS đầy đủ trên GitHub