1. Introduzione
In questo codelab, esplorerai i vantaggi in termini di prestazioni del motore di esecuzione nativo di Google Cloud Serverless per Apache Spark, Lightning Engine, ed esaminerai come ottimizza i tuoi workload Spark su Serverless per Apache Spark.
Lightning Engine utilizza Velox e Apache Gluten. Velox è un motore C++ ad alte prestazioni per l'elaborazione dei dati. Apache Gluten è un livello intermedio responsabile della conversione dei job Spark basati su JVM in codice C++ che può essere eseguito da Velox.
Questa demo utilizza TPC-DS, un benchmark standard del settore progettato per valutare le prestazioni dei sistemi di supporto decisionale. Invierai un job PySpark di base per eseguire query su un set di dati TPC-DS di esempio utilizzando il livello Serverless Standard. Poi, eseguirai lo stesso job utilizzando il livello Premium con Lightning Engine abilitato. Infine, confronterai il tempo di esecuzione ed esaminerai l'interfaccia utente di Spark per visualizzare la differenza nei grafici di esecuzione di Spark con accelerazione hardware.
Il costo stimato per eseguire questo codelab è inferiore a 1,00$, presupponendo che le risorse vengano liberate tempestivamente come descritto nella sezione Liberare spazio.
In questo lab proverai a:
- Creare un bucket Cloud Storage per archiviare gli script e i risultati dei benchmark
- Eseguire un job di elaborazione dei dati PySpark di base utilizzando il livello Serverless per Apache Spark Standard
- Eseguire lo stesso job utilizzando il livello Serverless per Apache Spark Premium con Lightning Engine
- Confrontare le metriche di runtime
- Avviare l'interfaccia utente del server di cronologia Spark per confrontare i grafici di esecuzione fisica nativa
Che cosa ti serve
- Un browser web come Chrome
- Un progetto Google Cloud con la fatturazione abilitata
- Conoscenza di base di Apache Spark e della riga di comando Linux
2. Prima di iniziare
Crea un progetto Google Cloud
- Nella console Google Cloud, nella pagina di selezione del progetto, seleziona o crea un progetto Google Cloud.
- Verifica che la fatturazione sia attivata per il tuo progetto Cloud. Scopri come verificare se la fatturazione è abilitata per un progetto.
Avvia Cloud Shell
Cloud Shell è un ambiente a riga di comando in esecuzione in Google Cloud che viene fornito con gli strumenti necessari precaricati.
- Fai clic su Attiva Cloud Shell nella parte superiore della console Google Cloud.
- Una volta eseguita la connessione a Cloud Shell, verifica l'autenticazione:
gcloud auth list - Verifica che il progetto sia configurato:
gcloud config get project - Se il progetto non è impostato come previsto, impostalo:
export PROJECT_ID=<YOUR_PROJECT_ID> gcloud config set project $PROJECT_ID
Abilita API
Esegui questo comando per abilitare tutte le API richieste per questo codelab:
gcloud services enable \
dataproc.googleapis.com \
storage.googleapis.com \
compute.googleapis.com
3. Prepara l'ambiente
In questo passaggio, inizializzerai le variabili di ambiente e creerai un bucket Cloud Storage. Questo bucket conterrà lo script PySpark che invierai a entrambi i livelli Serverless per Apache Spark.
Imposta le variabili di ambiente
Esegui i seguenti comandi in Cloud Shell per impostare le variabili di ambiente predefinite. Utilizzeremo la regione us-central1, ma puoi modificarla se preferisci.
export PROJECT_ID=$(gcloud config get-value project)
export REGION="us-central1"
export BUCKET_NAME="spark-benchmark-${PROJECT_ID}-${REGION}"
gcloud config set dataproc/region ${REGION}
Crea un bucket Cloud Storage
Crea il bucket per contenere gli script e i log:
gcloud storage buckets create gs://${BUCKET_NAME} \
--uniform-bucket-level-access \
--location=${REGION}
Copia il set di dati TPC-DS nel tuo bucket
In questo passaggio, copierai il set di dati TPC-DS da un bucket pubblico al tuo bucket Cloud Storage. In questo modo, i job PySpark possono leggere i dati localmente dal tuo progetto.
Imposta le variabili di ambiente per scegliere le dimensioni e il tipo del set di dati:
export DATASET_TYPE="partitioned" # Options: partitioned, nonpartitioned
export DATASET_SIZE="1GB" # Options: 1GB, 10GB, 100GB, 1000GB (1000GB not available for partitioned)
export SRC_PATH="gs://beam-tpcds/datasets/parquet/${DATASET_TYPE}/${DATASET_SIZE}"
export DATASET_PATH="gs://${BUCKET_NAME}/tpc-ds-dataset/${DATASET_TYPE}/${DATASET_SIZE}"
Copia i dati TPC-DS nel tuo bucket:
gcloud storage cp -r ${SRC_PATH}/* ${DATASET_PATH}/
Crea lo script di benchmark PySpark
Utilizzeremo uno script PySpark che registra le tabelle TPC-DS standard dal tuo bucket Cloud Storage ed esegue 5 query standard provenienti dal repository pubblico di Apache Spark. Lo script accetta il percorso del set di dati come argomento.
Crea un file denominato benchmark.py in Cloud Shell. Puoi copiare e incollare il seguente comando per generare il file:
cat << 'EOF' > benchmark.py
import argparse
import sys
from pyspark.sql import SparkSession
import time
def main():
parser = argparse.ArgumentParser(description='TPC-DS Benchmark')
parser.add_argument('data_path', help='GCS base path for TPC-DS tables')
args = parser.parse_args()
base_path = args.data_path
# Initialize Spark Session
spark = SparkSession.builder \
.appName("TPC-DS Benchmark") \
.getOrCreate()
print(f"Spark Session created. Registering TPC-DS tables from {base_path}...")
# List of all 24 TPC-DS tables
tables = [
"call_center", "catalog_page", "catalog_returns", "catalog_sales",
"customer", "customer_address", "customer_demographics", "date_dim",
"household_demographics", "income_band", "inventory", "item",
"promotion", "reason", "ship_mode", "store", "store_returns",
"store_sales", "time_dim", "warehouse", "web_page", "web_returns",
"web_sales", "web_site"
]
# Register each table as a temporary view
# For this subset of queries, not every table is used
for table in tables:
path = f"{base_path}/{table}"
try:
df = spark.read.parquet(path)
df.createOrReplaceTempView(table)
except Exception as e:
print(f"Warning: Could not load table {table} from {path}. Error: {e}")
print("Tables registered successfully. Starting benchmark queries from Apache Spark test suite...")
# Standard TPC-DS Queries sourced from Apache Spark public repository:
# https://github.com/apache/spark/tree/master/sql/core/src/test/resources/tpcds
queries = {
"Q1": """
WITH customer_total_return AS (
SELECT sr_customer_sk AS ctr_customer_sk,
sr_store_sk AS ctr_store_sk,
sum(sr_return_amt) AS ctr_total_return
FROM store_returns, date_dim
WHERE sr_returned_date_sk = d_date_sk
AND d_year = 2000
GROUP BY sr_customer_sk, sr_store_sk
)
SELECT c_customer_id
FROM customer_total_return ctr1, store, customer
WHERE ctr1.ctr_total_return > (
SELECT avg(ctr_total_return) * 1.2
FROM customer_total_return ctr2
WHERE ctr1.ctr_store_sk = ctr2.ctr_store_sk
)
AND s_store_sk = ctr1.ctr_store_sk
AND s_state = 'TN'
AND ctr1.ctr_customer_sk = c_customer_sk
ORDER BY c_customer_id
LIMIT 100
""",
"Q2": """
WITH wscs AS (
SELECT sold_date_sk, sales_price
FROM (
SELECT ws_sold_date_sk AS sold_date_sk, ws_ext_sales_price AS sales_price
FROM web_sales
UNION ALL
SELECT cs_sold_date_sk AS sold_date_sk, cs_ext_sales_price AS sales_price
FROM catalog_sales
)
),
wswscs AS (
SELECT d_week_seq,
sum(CASE WHEN (d_day_name='Sunday') THEN sales_price ELSE null END) AS sun_sales,
sum(CASE WHEN (d_day_name='Monday') THEN sales_price ELSE null END) AS mon_sales,
sum(CASE WHEN (d_day_name='Tuesday') THEN sales_price ELSE null END) AS tue_sales,
sum(CASE WHEN (d_day_name='Wednesday') THEN sales_price ELSE null END) AS wed_sales,
sum(CASE WHEN (d_day_name='Thursday') THEN sales_price ELSE null END) AS thu_sales,
sum(CASE WHEN (d_day_name='Friday') THEN sales_price ELSE null END) AS fri_sales,
sum(CASE WHEN (d_day_name='Saturday') THEN sales_price ELSE null END) AS sat_sales
FROM wscs, date_dim
WHERE d_date_sk = sold_date_sk
GROUP BY d_week_seq
)
SELECT d_week_seq1,
round(sun_sales1/sun_sales2, 2),
round(mon_sales1/mon_sales2, 2),
round(tue_sales1/tue_sales2, 2),
round(wed_sales1/wed_sales2, 2),
round(thu_sales1/thu_sales2, 2),
round(fri_sales1/fri_sales2, 2),
round(sat_sales1/sat_sales2, 2)
FROM (
SELECT wswscs.d_week_seq AS d_week_seq1,
sun_sales AS sun_sales1, mon_sales AS mon_sales1,
tue_sales AS tue_sales1, wed_sales AS wed_sales1,
thu_sales AS thu_sales1, fri_sales AS fri_sales1,
sat_sales AS sat_sales1
FROM wswscs, date_dim
WHERE date_dim.d_week_seq = wswscs.d_week_seq
AND d_year = 2001
) y,
(
SELECT wswscs.d_week_seq AS d_week_seq2,
sun_sales AS sun_sales2, mon_sales AS mon_sales2,
tue_sales AS tue_sales2, wed_sales AS wed_sales2,
thu_sales AS thu_sales2, fri_sales AS fri_sales2,
sat_sales AS sat_sales2
FROM wswscs, date_dim
WHERE date_dim.d_week_seq = wswscs.d_week_seq
AND d_year = 2001 + 1
) z
WHERE d_week_seq1 = d_week_seq2 - 53
ORDER BY d_week_seq1
""",
"Q3": """
SELECT dt.d_year,
item.i_brand_id AS brand_id,
item.i_brand AS brand,
sum(ss_ext_sales_price) AS sum_agg
FROM date_dim dt,
store_sales,
item
WHERE dt.d_date_sk = store_sales.ss_sold_date_sk
AND store_sales.ss_item_sk = item.i_item_sk
AND item.i_manufact_id = 436
AND dt.d_moy = 12
GROUP BY dt.d_year,
item.i_brand,
item.i_brand_id
ORDER BY dt.d_year,
sum_agg DESC,
brand_id
LIMIT 100
""",
"Q7": """
SELECT i_item_id,
avg(ss_quantity) AS agg1,
avg(ss_list_price) AS agg2,
avg(ss_coupon_amt) AS agg3,
avg(ss_sales_price) AS agg4
FROM store_sales,
customer_demographics,
date_dim,
item,
promotion
WHERE ss_sold_date_sk = d_date_sk
AND ss_item_sk = i_item_sk
AND ss_cdemo_sk = cd_demo_sk
AND ss_promo_sk = p_promo_sk
AND cd_gender = 'M'
AND cd_marital_status = 'S'
AND cd_education_status = 'College'
AND (p_channel_email = 'N' OR p_channel_event = 'N')
AND d_year = 2000
GROUP BY i_item_id
ORDER BY i_item_id
LIMIT 100
""",
"Q19": """
SELECT i_item_id,
i_brand,
i_category,
i_class,
i_manufact,
sum(ss_ext_sales_price) AS sales,
sum(ss_net_profit) AS profit
FROM date_dim,
store_sales,
item,
customer,
store
WHERE d_date_sk = ss_sold_date_sk
AND i_item_sk = ss_item_sk
AND d_year = 2000
AND d_moy = 12
AND c_customer_sk = ss_customer_sk
AND s_store_sk = ss_store_sk
AND i_manager_id = 9
GROUP BY i_item_id,
i_brand,
i_category,
i_class,
i_manufact
ORDER BY i_item_id,
i_brand,
i_category,
i_class,
i_manufact
LIMIT 100
"""
}
total_start_time = time.time()
for query_name, query_sql in queries.items():
print(f"\nExecuting {query_name}...")
query_start = time.time()
# Execute query and force action using show()
result_df = spark.sql(query_sql)
result_df.show(5) # Show top 5 rows
query_end = time.time()
print(f"{query_name} completed in {query_end - query_start:.2f} seconds.")
total_end_time = time.time()
print(f"\nAll benchmark queries completed in {total_end_time - total_start_time:.2f} seconds.")
spark.stop()
if __name__ == "__main__":
main()
EOF
Copia lo script nel bucket Cloud Storage in modo che Serverless per Apache Spark possa accedervi:
gcloud storage cp benchmark.py gs://${BUCKET_NAME}/scripts/benchmark.py
4. Esegui il job Serverless di base
Per fornire un confronto di base senza Lightning Engine, invia il job di benchmarking PySpark che hai caricato in precedenza al livello Serverless per Apache Spark Standard. Passeremo il percorso del set di dati che hai copiato come argomento.
Esegui il comando seguente per eseguire il job batch:
gcloud dataproc batches submit pyspark \
gs://${BUCKET_NAME}/scripts/benchmark.py \
--region=${REGION} \
--version=2.3 \
--deps-bucket=gs://${BUCKET_NAME} \
-- ${DATASET_PATH}
Monitora il job
Durante l'esecuzione del job, vedrai i log PySpark in streaming nel terminale Cloud Shell. Serverless per Apache Spark sta allocando i container, leggendo il set di dati Parquet TPC-DS da Cloud Storage ed eseguendo i piani SQL complessi.
Al termine dello script, osserva l'output della console. Dovresti vedere i risultati e i tempi per ogni query standard eseguita, simili a:
... Executing Q1... +-------------+ |c_customer_id| +-------------+ ... Q1 completed in 18.52 seconds. ... All benchmark queries completed in 110.94 seconds.
Prendi nota del numero totale di secondi necessari per il completamento. Questo è il runtime di base.
5. Esegui con Serverless Premium e Lightning Engine
Poi, eseguirai lo stesso job Spark su Serverless per Apache Spark, ma utilizzando il livello Premium e abilitando il motore di query vettorializzato nativo di Google: Lightning Engine.
Invia il job di benchmark a Serverless con Lightning Engine abilitato in modo esplicito:
gcloud dataproc batches submit pyspark \
gs://${BUCKET_NAME}/scripts/benchmark.py \
--region=${REGION} \
--version=2.3 \
--deps-bucket=gs://${BUCKET_NAME} \
--properties="dataproc.tier=premium,spark.dataproc.lightningEngine.runtime=native" \
-- ${DATASET_PATH}
Confronta i risultati
Attendi il completamento del job ed esamina l'output. Dovresti vedere gli stessi risultati della query. Esamina attentamente il tempo di completamento:
... All benchmark queries completed in 64.24 seconds.
Confrontando l'esecuzione di base di Serverless con l'esecuzione di Serverless Lightning Engine, noterai che Lightning Engine esegue i raggruppamenti, le aggregazioni e i join più rapidamente utilizzando un livello di esecuzione C++ nativo e l'elaborazione vettorializzata sul backend, senza richiedere modifiche al codice dell'applicazione PySpark.
Lightning Engine è ottimizzato per aumentare le prestazioni quanto più grande è il workload. In questo esempio utilizziamo un set di dati di piccole dimensioni, quindi l'aumento delle prestazioni non è così drastico come potrebbe essere. Su un set di dati da 10 TB, i benchmark hanno dimostrato un miglioramento delle prestazioni fino a 4,3 volte rispetto a Spark open source.
6. Confronta i grafici di esecuzione nell'interfaccia utente di Spark
La riduzione del runtime è impressionante, ma esaminiamo sotto il cofano cosa fa effettivamente Spark durante l'esecuzione della query. Puoi farlo esaminando i grafici di esecuzione dell'interfaccia utente di Spark per entrambi i job.
- Apri la console Google Cloud nel browser.
- Vai a Dataproc > Batch.
- Nell'elenco vedrai due batch: l'esecuzione di base standard e l'esecuzione del livello Premium.
- Fai clic sul batch del livello Premium che hai eseguito, poi su Visualizza interfaccia utente di Spark e infine su Visualizza dettagli.
- Nell'interfaccia utente di Spark, vai alla scheda Job.
- In Job completati, nella casella di ricerca, digita
Velox. - Vedrai molte descrizioni dei job che includono
VeloxSparkPlanExecApi. Si riferisce al motore di esecuzione nativo di Velox utilizzato da Lightning Engine.
Ora, ripeti questa procedura per l'esecuzione del livello Standard:
- Torna alla pagina Batch di Serverless per Apache Spark.
- Fai clic sul link per il batch del livello Standard, poi su Visualizza interfaccia utente di Spark e infine su Visualizza dettagli.
- Nell'interfaccia utente di Spark, vai alla scheda Job.
- In Job completati, nella casella di ricerca, digita
Velox. - Non vedrai alcun riferimento all'API Velox nelle descrizioni dei job.
7. Libera spazio
Per evitare addebiti continui sul tuo account Google Cloud, elimina le risorse create durante questo codelab.
In Cloud Shell, elimina il bucket Cloud Storage e i relativi contenuti:
gcloud storage rm -r gs://${BUCKET_NAME}
Elimina la copia locale di benchmark.py:
rm benchmark.py
8. Complimenti
Complimenti! Hai creato correttamente un ambiente di benchmarking per Apache Spark e hai confrontato Serverless per Apache Spark Standard con Serverless per Apache Spark Premium.
Hai visto in prima persona come l'abilitazione del nuovo Lightning Engine di Serverless per Apache Spark può ridurre il runtime del workload Spark e hai esplorato l'interfaccia utente di Spark per vedere come il grafico di esecuzione fisica viene trasformato in codice C++ nativo utilizzando il motore di query nativo.
Che cosa hai imparato
- Come scrivere uno script di benchmarking del set di dati PySpark.
- Come inviare job Spark a Serverless per Apache Spark.
- Come abilitare Lightning Engine.
- Come confrontare i piani dei job nell'interfaccia utente di Spark.
Passaggi successivi
- Esplora la documentazione di Serverless per Apache Spark
- Consulta lo strumento di qualificazione dell'esecuzione di query native
- Dai un'occhiata alle query di benchmarking TPC-DS complete su GitHub