1. مقدمة
في هذا الدرس التطبيقي حول الترميز، ستتعرّف على مزايا الأداء التي يقدّمها محرك التنفيذ الأصلي Lightning Engine في الحوسبة بدون خادم من Google Cloud لـ Apache Spark، وستتعرّف على كيفية تحسين أحمال عمل Spark على الحوسبة بدون خادم من Apache Spark.
تستخدم Lightning Engine Velox وApache Gluten. Velox هو محرك C++ عالي الأداء لمعالجة البيانات. Apache Gluten هي طبقة وسيطة مسؤولة عن تحويل مهام Spark المستندة إلى JVM إلى رمز C++ يمكن تنفيذه بواسطة Velox.
يستخدم هذا العرض التوضيحي TPC-DS، وهو معيار صناعي مصمّم لتقييم أداء أنظمة دعم اتخاذ القرار. سترسل مهمة PySpark أساسية لطلب البحث عن نموذج مجموعة بيانات TPC-DS باستخدام فئة Standard Serverless. بعد ذلك، ستشغّل المهمة نفسها تمامًا باستخدام فئة Premium مع تفعيل Lightning Engine. أخيرًا، ستتم مقارنة وقت التنفيذ والتعمّق في واجهة مستخدم Spark لتصوّر الفرق في الرسوم البيانية لتنفيذ Spark المُسارع بالأجهزة.
تبلغ التكلفة المقدَّرة لتنفيذ هذا الدرس العملي أقل من 1.00 دولار أمريكي، على افتراض أنّه سيتم إخلاء مساحة الموارد على الفور كما هو موضّح في قسم إخلاء المساحة.
الإجراءات التي ستنفذّها
- أنشئ حزمة Cloud Storage لتخزين نصوص البرامج والنتائج الخاصة بمقاييس الأداء
- تنفيذ مهمة أساسية لمعالجة البيانات في PySpark باستخدام الفئة العادية من خدمة "الحوسبة بدون خادم من Google Cloud لـ Apache Spark"
- تنفيذ المهمة نفسها باستخدام الفئة المميزة من الحوسبة بدون خادم من Apache Spark مع Lightning Engine
- مقارنة مقاييس وقت التشغيل
- تشغيل واجهة مستخدم Spark History Server لمقارنة الرسوم البيانية الأصلية للتنفيذ الفعلي
المتطلبات
- متصفّح ويب، مثل Chrome
- مشروع Google Cloud تم تفعيل الفوترة فيه
- معرفة أساسية بـ Apache Spark وسطر أوامر Linux
2. قبل البدء
إنشاء مشروع على Google Cloud
- في Google Cloud Console، في صفحة اختيار المشروع، اختَر مشروعًا على Google Cloud أو أنشِئ مشروعًا.
- تأكَّد من تفعيل الفوترة لمشروعك على السحابة الإلكترونية. كيفية التحقّق مما إذا كانت الفوترة مفعَّلة في مشروع
بدء Cloud Shell
Cloud Shell هي بيئة سطر أوامر تعمل في Google Cloud ومحمّلة مسبقًا بالأدوات اللازمة.
- انقر على تفعيل Cloud Shell في أعلى "وحدة تحكّم Google Cloud".
- بعد الاتصال بـ Cloud Shell، تحقَّق من المصادقة باتّباع الخطوات التالية:
gcloud auth list - تأكَّد من إعداد مشروعك باتّباع الخطوات التالية:
gcloud config get project - إذا لم يتم ضبط مشروعك على النحو المتوقّع، اضبطه باتّباع الخطوات التالية:
export PROJECT_ID=<YOUR_PROJECT_ID> gcloud config set project $PROJECT_ID
تفعيل واجهات برمجة التطبيقات
نفِّذ الأمر التالي لتفعيل جميع واجهات برمجة التطبيقات المطلوبة لهذا الدرس العملي:
gcloud services enable \
dataproc.googleapis.com \
storage.googleapis.com \
compute.googleapis.com
3- إعداد البيئة
في هذه الخطوة، عليك إعداد المتغيرات البيئية وإنشاء حزمة في Cloud Storage. سيحتوي هذا الحِزمة على نص PySpark البرمجي الذي ترسله إلى كلتا فئتي "الحوسبة بدون خادم من Google Cloud لـ Apache Spark".
ضبط متغيرات البيئة
نفِّذ الأوامر التالية في Cloud Shell لضبط متغيرات البيئة التلقائية. سنستخدم المنطقة us-central1، ولكن يمكنك تغييرها إذا كنت تفضّل ذلك.
export PROJECT_ID=$(gcloud config get-value project)
export REGION="us-central1"
export BUCKET_NAME="spark-benchmark-${PROJECT_ID}-${REGION}"
gcloud config set dataproc/region ${REGION}
إنشاء حزمة في Cloud Storage
أنشئ الحزمة التي ستتضمّن النصوص البرمجية والسجلات:
gcloud storage buckets create gs://${BUCKET_NAME} \
--uniform-bucket-level-access \
--location=${REGION}
نسخ مجموعة بيانات TPC-DS إلى الحزمة الخاصة بك
في هذه الخطوة، ستنسخ مجموعة بيانات TPC-DS من حزمة عامة إلى حزمتك الخاصة على Cloud Storage. يضمن ذلك إمكانية قراءة مهام PySpark للبيانات محليًا من مشروعك.
اضبط متغيّرات البيئة لاختيار حجم مجموعة البيانات ونوعها:
export DATASET_TYPE="partitioned" # Options: partitioned, nonpartitioned
export DATASET_SIZE="1GB" # Options: 1GB, 10GB, 100GB, 1000GB (1000GB not available for partitioned)
export SRC_PATH="gs://beam-tpcds/datasets/parquet/${DATASET_TYPE}/${DATASET_SIZE}"
export DATASET_PATH="gs://${BUCKET_NAME}/tpc-ds-dataset/${DATASET_TYPE}/${DATASET_SIZE}"
انسخ بيانات TPC-DS إلى الحزمة الخاصة بك:
gcloud storage cp -r ${SRC_PATH}/* ${DATASET_PATH}/
إنشاء نص PySpark البرمجي الخاص باختبار الأداء
سنستخدم نصًا برمجيًا من PySpark يسجّل جداول TPC-DS العادية من حزمة Cloud Storage وينفّذ 5 طلبات بحث عادية مصدرها مستودع Apache Spark العام. يقبل النص البرمجي مسار مجموعة البيانات كوسيطة.
أنشئ ملفًا باسم benchmark.py في Cloud Shell. يمكنك نسخ الأمر التالي ولصقه لإنشاء الملف:
cat << 'EOF' > benchmark.py
import argparse
import sys
from pyspark.sql import SparkSession
import time
def main():
parser = argparse.ArgumentParser(description='TPC-DS Benchmark')
parser.add_argument('data_path', help='GCS base path for TPC-DS tables')
args = parser.parse_args()
base_path = args.data_path
# Initialize Spark Session
spark = SparkSession.builder \
.appName("TPC-DS Benchmark") \
.getOrCreate()
print(f"Spark Session created. Registering TPC-DS tables from {base_path}...")
# List of all 24 TPC-DS tables
tables = [
"call_center", "catalog_page", "catalog_returns", "catalog_sales",
"customer", "customer_address", "customer_demographics", "date_dim",
"household_demographics", "income_band", "inventory", "item",
"promotion", "reason", "ship_mode", "store", "store_returns",
"store_sales", "time_dim", "warehouse", "web_page", "web_returns",
"web_sales", "web_site"
]
# Register each table as a temporary view
# For this subset of queries, not every table is used
for table in tables:
path = f"{base_path}/{table}"
try:
df = spark.read.parquet(path)
df.createOrReplaceTempView(table)
except Exception as e:
print(f"Warning: Could not load table {table} from {path}. Error: {e}")
print("Tables registered successfully. Starting benchmark queries from Apache Spark test suite...")
# Standard TPC-DS Queries sourced from Apache Spark public repository:
# https://github.com/apache/spark/tree/master/sql/core/src/test/resources/tpcds
queries = {
"Q1": """
WITH customer_total_return AS (
SELECT sr_customer_sk AS ctr_customer_sk,
sr_store_sk AS ctr_store_sk,
sum(sr_return_amt) AS ctr_total_return
FROM store_returns, date_dim
WHERE sr_returned_date_sk = d_date_sk
AND d_year = 2000
GROUP BY sr_customer_sk, sr_store_sk
)
SELECT c_customer_id
FROM customer_total_return ctr1, store, customer
WHERE ctr1.ctr_total_return > (
SELECT avg(ctr_total_return) * 1.2
FROM customer_total_return ctr2
WHERE ctr1.ctr_store_sk = ctr2.ctr_store_sk
)
AND s_store_sk = ctr1.ctr_store_sk
AND s_state = 'TN'
AND ctr1.ctr_customer_sk = c_customer_sk
ORDER BY c_customer_id
LIMIT 100
""",
"Q2": """
WITH wscs AS (
SELECT sold_date_sk, sales_price
FROM (
SELECT ws_sold_date_sk AS sold_date_sk, ws_ext_sales_price AS sales_price
FROM web_sales
UNION ALL
SELECT cs_sold_date_sk AS sold_date_sk, cs_ext_sales_price AS sales_price
FROM catalog_sales
)
),
wswscs AS (
SELECT d_week_seq,
sum(CASE WHEN (d_day_name='Sunday') THEN sales_price ELSE null END) AS sun_sales,
sum(CASE WHEN (d_day_name='Monday') THEN sales_price ELSE null END) AS mon_sales,
sum(CASE WHEN (d_day_name='Tuesday') THEN sales_price ELSE null END) AS tue_sales,
sum(CASE WHEN (d_day_name='Wednesday') THEN sales_price ELSE null END) AS wed_sales,
sum(CASE WHEN (d_day_name='Thursday') THEN sales_price ELSE null END) AS thu_sales,
sum(CASE WHEN (d_day_name='Friday') THEN sales_price ELSE null END) AS fri_sales,
sum(CASE WHEN (d_day_name='Saturday') THEN sales_price ELSE null END) AS sat_sales
FROM wscs, date_dim
WHERE d_date_sk = sold_date_sk
GROUP BY d_week_seq
)
SELECT d_week_seq1,
round(sun_sales1/sun_sales2, 2),
round(mon_sales1/mon_sales2, 2),
round(tue_sales1/tue_sales2, 2),
round(wed_sales1/wed_sales2, 2),
round(thu_sales1/thu_sales2, 2),
round(fri_sales1/fri_sales2, 2),
round(sat_sales1/sat_sales2, 2)
FROM (
SELECT wswscs.d_week_seq AS d_week_seq1,
sun_sales AS sun_sales1, mon_sales AS mon_sales1,
tue_sales AS tue_sales1, wed_sales AS wed_sales1,
thu_sales AS thu_sales1, fri_sales AS fri_sales1,
sat_sales AS sat_sales1
FROM wswscs, date_dim
WHERE date_dim.d_week_seq = wswscs.d_week_seq
AND d_year = 2001
) y,
(
SELECT wswscs.d_week_seq AS d_week_seq2,
sun_sales AS sun_sales2, mon_sales AS mon_sales2,
tue_sales AS tue_sales2, wed_sales AS wed_sales2,
thu_sales AS thu_sales2, fri_sales AS fri_sales2,
sat_sales AS sat_sales2
FROM wswscs, date_dim
WHERE date_dim.d_week_seq = wswscs.d_week_seq
AND d_year = 2001 + 1
) z
WHERE d_week_seq1 = d_week_seq2 - 53
ORDER BY d_week_seq1
""",
"Q3": """
SELECT dt.d_year,
item.i_brand_id AS brand_id,
item.i_brand AS brand,
sum(ss_ext_sales_price) AS sum_agg
FROM date_dim dt,
store_sales,
item
WHERE dt.d_date_sk = store_sales.ss_sold_date_sk
AND store_sales.ss_item_sk = item.i_item_sk
AND item.i_manufact_id = 436
AND dt.d_moy = 12
GROUP BY dt.d_year,
item.i_brand,
item.i_brand_id
ORDER BY dt.d_year,
sum_agg DESC,
brand_id
LIMIT 100
""",
"Q7": """
SELECT i_item_id,
avg(ss_quantity) AS agg1,
avg(ss_list_price) AS agg2,
avg(ss_coupon_amt) AS agg3,
avg(ss_sales_price) AS agg4
FROM store_sales,
customer_demographics,
date_dim,
item,
promotion
WHERE ss_sold_date_sk = d_date_sk
AND ss_item_sk = i_item_sk
AND ss_cdemo_sk = cd_demo_sk
AND ss_promo_sk = p_promo_sk
AND cd_gender = 'M'
AND cd_marital_status = 'S'
AND cd_education_status = 'College'
AND (p_channel_email = 'N' OR p_channel_event = 'N')
AND d_year = 2000
GROUP BY i_item_id
ORDER BY i_item_id
LIMIT 100
""",
"Q19": """
SELECT i_item_id,
i_brand,
i_category,
i_class,
i_manufact,
sum(ss_ext_sales_price) AS sales,
sum(ss_net_profit) AS profit
FROM date_dim,
store_sales,
item,
customer,
store
WHERE d_date_sk = ss_sold_date_sk
AND i_item_sk = ss_item_sk
AND d_year = 2000
AND d_moy = 12
AND c_customer_sk = ss_customer_sk
AND s_store_sk = ss_store_sk
AND i_manager_id = 9
GROUP BY i_item_id,
i_brand,
i_category,
i_class,
i_manufact
ORDER BY i_item_id,
i_brand,
i_category,
i_class,
i_manufact
LIMIT 100
"""
}
total_start_time = time.time()
for query_name, query_sql in queries.items():
print(f"\nExecuting {query_name}...")
query_start = time.time()
# Execute query and force action using show()
result_df = spark.sql(query_sql)
result_df.show(5) # Show top 5 rows
query_end = time.time()
print(f"{query_name} completed in {query_end - query_start:.2f} seconds.")
total_end_time = time.time()
print(f"\nAll benchmark queries completed in {total_end_time - total_start_time:.2f} seconds.")
spark.stop()
if __name__ == "__main__":
main()
EOF
انسخ النص البرمجي إلى حزمة Cloud Storage ليتمكّن "الحوسبة بدون خادم من Google Cloud لـ Apache Spark" من الوصول إليه:
gcloud storage cp benchmark.py gs://${BUCKET_NAME}/scripts/benchmark.py
4. تشغيل مهمة Baseline Serverless
لتقديم مقارنة أساسية بدون Lightning Engine، أرسِل مهمة قياس الأداء PySpark التي حمّلتها سابقًا إلى فئة "عادية" من Serverless for Apache Spark. سنمرّر المسار إلى مجموعة البيانات التي نسختها كوسيطة.
نفِّذ الأمر التالي لتشغيل مهمة الدفعات:
gcloud dataproc batches submit pyspark \
gs://${BUCKET_NAME}/scripts/benchmark.py \
--region=${REGION} \
--version=2.3 \
--deps-bucket=gs://${BUCKET_NAME} \
-- ${DATASET_PATH}
مراقبة المهمة
أثناء تنفيذ المهمة، ستظهر لك سجلات PySpark يتم بثها في نافذة Cloud Shell. تعمل خدمة Serverless for Apache Spark على تخصيص الحاويات وقراءة مجموعة بيانات TPC-DS Parquet من Cloud Storage وتنفيذ خطط SQL المعقّدة.
بعد اكتمال النص البرمجي، راقِب ناتج وحدة التحكّم. من المفترض أن تظهر لك نتائج وتوقيتات لكل طلب بحث عادي تم تنفيذه، على النحو التالي:
... Executing Q1... +-------------+ |c_customer_id| +-------------+ ... Q1 completed in 18.52 seconds. ... All benchmark queries completed in 110.94 seconds.
دوِّن إجمالي عدد الثواني التي استغرقها إكمال العملية. هذا هو وقت التشغيل الأساسي.
5- التنفيذ باستخدام Serverless Premium وLightning Engine
بعد ذلك، ستشغّل مهمة Spark نفسها تمامًا على Serverless for Apache Spark، ولكن باستخدام الفئة المميّزة وتفعيل محرّك طلب البحث الأصلي المتّجه من Google: Lightning Engine.
أرسِل مهمة قياس الأداء إلى Serverless مع تفعيل Lightning Engine بشكل صريح:
gcloud dataproc batches submit pyspark \
gs://${BUCKET_NAME}/scripts/benchmark.py \
--region=${REGION} \
--version=2.3 \
--deps-bucket=gs://${BUCKET_NAME} \
--properties="dataproc.tier=premium,spark.dataproc.lightningEngine.runtime=native" \
-- ${DATASET_PATH}
مقارنة النتائج
انتظِر حتى تكتمل المهمة وافحص الناتج. من المفترض أن تظهر لك نتائج الاستعلام نفسها. انتبه جيدًا إلى وقت الإكمال:
... All benchmark queries completed in 64.24 seconds.
عند مقارنة عملية التشغيل الأساسية في Serverless بعملية التشغيل في Serverless Lightning Engine، ستلاحظ أنّ Lightning Engine ينفّذ عمليات التجميع والتجميع والدمج بشكل أسرع من خلال استخدام طبقة تنفيذ C++ أصلية ومعالجة متجهة في الخلفية، بدون الحاجة إلى إجراء أي تغييرات على الرمز البرمجي لتطبيق PySpark.
تم تحسين Lightning Engine لتعزيز الأداء كلما زاد حجم عبء العمل. في هذا المثال، نستخدم مجموعة بيانات صغيرة، لذا فإنّ التحسّن في الأداء ليس كبيرًا كما يمكن أن يكون. في مجموعة بيانات بحجم 10 تيرابايت، تم رصد تحسُّن في الأداء يصل إلى 4.3 مرّة مقارنةً بـ Spark المفتوح المصدر في مقاييس الأداء.
6. مقارنة "رسومات التنفيذ" في واجهة مستخدم Spark
إنّ انخفاض وقت التشغيل مثير للإعجاب، ولكن دعونا نلقي نظرة على التفاصيل لمعرفة ما يفعله Spark فعليًا أثناء تنفيذ طلب البحث. يمكنك إجراء ذلك من خلال فحص رسومات التنفيذ البيانية في واجهة مستخدم Spark لكلتا المهمتين.
- افتح Google Cloud Console في المتصفّح.
- انتقِل إلى Dataproc > الدفعات.
- ستظهر مجموعتان في القائمة: المجموعة الأساسية العادية والمجموعة المميزة.
- انقر على دفعة المستوى المميز التي نفّذتها، ثم انقر على عرض واجهة مستخدم Spark، ثم على عرض التفاصيل.
- في واجهة مستخدم Spark، انتقِل إلى علامة التبويب المهام (Jobs).
- ضمن المهام المكتملة، اكتب
Veloxفي مربّع البحث. - ستظهر لك العديد من الأوصاف الوظيفية التي تتضمّن
VeloxSparkPlanExecApi. يشير ذلك إلى محرّك التنفيذ الأصلي Velox الذي يستخدمه Lightning Engine.
كرِّر هذه العملية الآن لتنفيذ فئة Standard:
- ارجع إلى صفحة "الحوسبة بدون خادم لعمليات Apache Spark المجمّعة".
- انقر على رابط مجموعة المستوى العادي، ثمّ انقر على عرض واجهة مستخدم Spark، ثمّ على عرض التفاصيل.
- في واجهة مستخدم Spark، انتقِل إلى علامة التبويب المهام (Jobs).
- ضمن المهام المكتملة، اكتب
Veloxفي مربّع البحث. - لن تجد أي إشارة إلى واجهة برمجة التطبيقات Velox في أوصاف الوظائف.
7. تَنظيم
لتجنُّب الرسوم المستمرة على حسابك على Google Cloud، احذف الموارد التي تم إنشاؤها أثناء هذا الدرس التطبيقي حول الترميز.
في Cloud Shell، احذف حزمة Cloud Storage ومحتواها:
gcloud storage rm -r gs://${BUCKET_NAME}
لحذف النسخة المحفوظة على جهازك من benchmark.py، اتّبِع الخطوات التالية:
rm benchmark.py
8. تهانينا
تهانينا! لقد نجحت في إنشاء بيئة قياس أداء لـ Apache Spark ومقارنة "الحوسبة بدون خادم من Google Cloud لـ Apache Spark" (الإصدار العادي) مع "الحوسبة بدون خادم من Google Cloud لـ Apache Spark" (الإصدار المميّز).
لقد رأيت بنفسك كيف يمكن أن يؤدي تفعيل ميزة "الحوسبة بدون خادم" في Lightning Engine الجديد من Apache Spark إلى تقليل وقت تشغيل أحمال عمل Spark، واستكشفت واجهة مستخدم Spark لمعرفة كيفية تحويل الرسم البياني للتنفيذ الفعلي إلى رمز C++ أصلي باستخدام Native Query Engine.
ما تعلّمته
- كيفية كتابة نص برمجي لقياس أداء مجموعة بيانات PySpark
- كيفية إرسال مهام Spark إلى "الحوسبة بدون خادم من Google Cloud لـ Apache Spark"
- كيفية تفعيل Lightning Engine
- كيفية مقارنة خطط المهام في واجهة مستخدم Spark
الخطوات التالية
- الاطّلاع على مستندات الحوسبة بدون خادم لـ Apache Spark
- الاطّلاع على أداة التأهّل لتنفيذ طلبات البحث الأصلية
- يمكنك الاطّلاع على استعلامات قياس الأداء الكاملة الخاصة بمعيار TPC-DS على GitHub.