1. Introduction
Dans cet atelier de programmation, vous allez découvrir les avantages en termes de performances du moteur d'exécution natif de Google Cloud Serverless pour Apache Spark, Lightning Engine, et examiner comment il optimise vos charges de travail Spark sur Serverless pour Apache Spark.
Lightning Engine utilise Velox et Apache Gluten. Velox est un moteur C++ hautes performances pour le traitement des données. Apache Gluten est une couche intermédiaire chargée de convertir les jobs Spark basés sur JVM en code C++ pouvant être exécuté par Velox.
Cette démonstration utilise TPC-DS, un benchmark standard conçu pour évaluer les performances des systèmes d'aide à la décision. Vous allez envoyer un job PySpark de référence pour interroger un exemple d'ensemble de données TPC-DS à l'aide du niveau Standard Serverless. Ensuite, vous exécuterez exactement le même job à l'aide du niveau Premium avec Lightning Engine activé. Enfin, vous comparerez le temps d'exécution et explorerez l'UI Spark pour visualiser la différence dans les graphiques d'exécution Spark accélérés par le matériel.
Le coût estimé de cet atelier de programmation est inférieur à 1,00 USD, en supposant que les ressources sont nettoyées rapidement, comme décrit dans la section Effectuer un nettoyage.
Objectifs de l'atelier
- Créer un Cloud Storage bucket pour stocker vos scripts et résultats de benchmark
- Exécuter un job de traitement de données PySpark de référence à l'aide du niveau Standard Serverless pour Apache Spark
- Exécuter le même job à l'aide du niveau Premium Serverless pour Apache Spark avec Lightning Engine
- Comparer les métriques d'exécution
- Lancer l'UI du serveur d'historique Spark pour comparer les graphiques d'exécution physique natifs
Ce dont vous avez besoin
- Un navigateur Web (par exemple, Chrome)
- Un projet Google Cloud avec facturation activée
- Connaissances de base de Apache Spark et de la ligne de commande Linux
2. Avant de commencer
Créer un projet Google Cloud
- Dans la console Google Cloud, sur la page du sélecteur de projet, sélectionnez ou créez un projet Google Cloud.
- Assurez-vous que la facturation est activée pour votre projet Cloud. Découvrez comment vérifier si la facturation est activée pour un projet.
Démarrer Cloud Shell
Cloud Shell est un environnement de ligne de commande exécuté dans Google Cloud, qui est préchargé avec les outils nécessaires.
- Cliquez sur Activer Cloud Shell en haut de la console Google Cloud.
- Une fois connecté à Cloud Shell, vérifiez votre authentification :
gcloud auth list - Vérifiez que votre projet est configuré :
gcloud config get project - Si votre projet n'est pas défini comme prévu, définissez-le :
export PROJECT_ID=<YOUR_PROJECT_ID> gcloud config set project $PROJECT_ID
Activer les API
Exécutez cette commande pour activer toutes les API requises pour cet atelier de programmation :
gcloud services enable \
dataproc.googleapis.com \
storage.googleapis.com \
compute.googleapis.com
3. Préparer votre environnement
Dans cette étape, vous allez initialiser les variables d'environnement et créer un bucket Cloud Storage. Ce bucket contiendra le script PySpark que vous enverrez aux deux niveaux Serverless pour Apache Spark.
Définir des variables d'environnement
Exécutez les commandes suivantes dans Cloud Shell pour définir les variables d'environnement par défaut. Nous utiliserons la région us-central1, mais vous pouvez la modifier si vous le souhaitez.
export PROJECT_ID=$(gcloud config get-value project)
export REGION="us-central1"
export BUCKET_NAME="spark-benchmark-${PROJECT_ID}-${REGION}"
gcloud config set dataproc/region ${REGION}
Créer un bucket Cloud Storage
Créez le bucket pour stocker vos scripts et journaux :
gcloud storage buckets create gs://${BUCKET_NAME} \
--uniform-bucket-level-access \
--location=${REGION}
Copier l'ensemble de données TPC-DS dans votre propre bucket
Dans cette étape, vous allez copier l'ensemble de données TPC-DS d'un bucket public vers votre propre bucket Cloud Storage. Ainsi, vos jobs PySpark pourront lire les données localement à partir de votre projet.
Définissez les variables d'environnement pour choisir la taille et le type de l'ensemble de données :
export DATASET_TYPE="partitioned" # Options: partitioned, nonpartitioned
export DATASET_SIZE="1GB" # Options: 1GB, 10GB, 100GB, 1000GB (1000GB not available for partitioned)
export SRC_PATH="gs://beam-tpcds/datasets/parquet/${DATASET_TYPE}/${DATASET_SIZE}"
export DATASET_PATH="gs://${BUCKET_NAME}/tpc-ds-dataset/${DATASET_TYPE}/${DATASET_SIZE}"
Copiez les données TPC-DS dans votre propre bucket :
gcloud storage cp -r ${SRC_PATH}/* ${DATASET_PATH}/
Créer le script de benchmark PySpark
Nous allons utiliser un script PySpark qui enregistre les tables TPC-DS standards de votre bucket Cloud Storage et exécute cinq requêtes standards provenant du dépôt public Apache Spark. Le script accepte le chemin d'accès à votre ensemble de données comme argument.
Créez un fichier nommé benchmark.py dans Cloud Shell. Vous pouvez copier et coller la commande suivante pour générer le fichier :
cat << 'EOF' > benchmark.py
import argparse
import sys
from pyspark.sql import SparkSession
import time
def main():
parser = argparse.ArgumentParser(description='TPC-DS Benchmark')
parser.add_argument('data_path', help='GCS base path for TPC-DS tables')
args = parser.parse_args()
base_path = args.data_path
# Initialize Spark Session
spark = SparkSession.builder \
.appName("TPC-DS Benchmark") \
.getOrCreate()
print(f"Spark Session created. Registering TPC-DS tables from {base_path}...")
# List of all 24 TPC-DS tables
tables = [
"call_center", "catalog_page", "catalog_returns", "catalog_sales",
"customer", "customer_address", "customer_demographics", "date_dim",
"household_demographics", "income_band", "inventory", "item",
"promotion", "reason", "ship_mode", "store", "store_returns",
"store_sales", "time_dim", "warehouse", "web_page", "web_returns",
"web_sales", "web_site"
]
# Register each table as a temporary view
# For this subset of queries, not every table is used
for table in tables:
path = f"{base_path}/{table}"
try:
df = spark.read.parquet(path)
df.createOrReplaceTempView(table)
except Exception as e:
print(f"Warning: Could not load table {table} from {path}. Error: {e}")
print("Tables registered successfully. Starting benchmark queries from Apache Spark test suite...")
# Standard TPC-DS Queries sourced from Apache Spark public repository:
# https://github.com/apache/spark/tree/master/sql/core/src/test/resources/tpcds
queries = {
"Q1": """
WITH customer_total_return AS (
SELECT sr_customer_sk AS ctr_customer_sk,
sr_store_sk AS ctr_store_sk,
sum(sr_return_amt) AS ctr_total_return
FROM store_returns, date_dim
WHERE sr_returned_date_sk = d_date_sk
AND d_year = 2000
GROUP BY sr_customer_sk, sr_store_sk
)
SELECT c_customer_id
FROM customer_total_return ctr1, store, customer
WHERE ctr1.ctr_total_return > (
SELECT avg(ctr_total_return) * 1.2
FROM customer_total_return ctr2
WHERE ctr1.ctr_store_sk = ctr2.ctr_store_sk
)
AND s_store_sk = ctr1.ctr_store_sk
AND s_state = 'TN'
AND ctr1.ctr_customer_sk = c_customer_sk
ORDER BY c_customer_id
LIMIT 100
""",
"Q2": """
WITH wscs AS (
SELECT sold_date_sk, sales_price
FROM (
SELECT ws_sold_date_sk AS sold_date_sk, ws_ext_sales_price AS sales_price
FROM web_sales
UNION ALL
SELECT cs_sold_date_sk AS sold_date_sk, cs_ext_sales_price AS sales_price
FROM catalog_sales
)
),
wswscs AS (
SELECT d_week_seq,
sum(CASE WHEN (d_day_name='Sunday') THEN sales_price ELSE null END) AS sun_sales,
sum(CASE WHEN (d_day_name='Monday') THEN sales_price ELSE null END) AS mon_sales,
sum(CASE WHEN (d_day_name='Tuesday') THEN sales_price ELSE null END) AS tue_sales,
sum(CASE WHEN (d_day_name='Wednesday') THEN sales_price ELSE null END) AS wed_sales,
sum(CASE WHEN (d_day_name='Thursday') THEN sales_price ELSE null END) AS thu_sales,
sum(CASE WHEN (d_day_name='Friday') THEN sales_price ELSE null END) AS fri_sales,
sum(CASE WHEN (d_day_name='Saturday') THEN sales_price ELSE null END) AS sat_sales
FROM wscs, date_dim
WHERE d_date_sk = sold_date_sk
GROUP BY d_week_seq
)
SELECT d_week_seq1,
round(sun_sales1/sun_sales2, 2),
round(mon_sales1/mon_sales2, 2),
round(tue_sales1/tue_sales2, 2),
round(wed_sales1/wed_sales2, 2),
round(thu_sales1/thu_sales2, 2),
round(fri_sales1/fri_sales2, 2),
round(sat_sales1/sat_sales2, 2)
FROM (
SELECT wswscs.d_week_seq AS d_week_seq1,
sun_sales AS sun_sales1, mon_sales AS mon_sales1,
tue_sales AS tue_sales1, wed_sales AS wed_sales1,
thu_sales AS thu_sales1, fri_sales AS fri_sales1,
sat_sales AS sat_sales1
FROM wswscs, date_dim
WHERE date_dim.d_week_seq = wswscs.d_week_seq
AND d_year = 2001
) y,
(
SELECT wswscs.d_week_seq AS d_week_seq2,
sun_sales AS sun_sales2, mon_sales AS mon_sales2,
tue_sales AS tue_sales2, wed_sales AS wed_sales2,
thu_sales AS thu_sales2, fri_sales AS fri_sales2,
sat_sales AS sat_sales2
FROM wswscs, date_dim
WHERE date_dim.d_week_seq = wswscs.d_week_seq
AND d_year = 2001 + 1
) z
WHERE d_week_seq1 = d_week_seq2 - 53
ORDER BY d_week_seq1
""",
"Q3": """
SELECT dt.d_year,
item.i_brand_id AS brand_id,
item.i_brand AS brand,
sum(ss_ext_sales_price) AS sum_agg
FROM date_dim dt,
store_sales,
item
WHERE dt.d_date_sk = store_sales.ss_sold_date_sk
AND store_sales.ss_item_sk = item.i_item_sk
AND item.i_manufact_id = 436
AND dt.d_moy = 12
GROUP BY dt.d_year,
item.i_brand,
item.i_brand_id
ORDER BY dt.d_year,
sum_agg DESC,
brand_id
LIMIT 100
""",
"Q7": """
SELECT i_item_id,
avg(ss_quantity) AS agg1,
avg(ss_list_price) AS agg2,
avg(ss_coupon_amt) AS agg3,
avg(ss_sales_price) AS agg4
FROM store_sales,
customer_demographics,
date_dim,
item,
promotion
WHERE ss_sold_date_sk = d_date_sk
AND ss_item_sk = i_item_sk
AND ss_cdemo_sk = cd_demo_sk
AND ss_promo_sk = p_promo_sk
AND cd_gender = 'M'
AND cd_marital_status = 'S'
AND cd_education_status = 'College'
AND (p_channel_email = 'N' OR p_channel_event = 'N')
AND d_year = 2000
GROUP BY i_item_id
ORDER BY i_item_id
LIMIT 100
""",
"Q19": """
SELECT i_item_id,
i_brand,
i_category,
i_class,
i_manufact,
sum(ss_ext_sales_price) AS sales,
sum(ss_net_profit) AS profit
FROM date_dim,
store_sales,
item,
customer,
store
WHERE d_date_sk = ss_sold_date_sk
AND i_item_sk = ss_item_sk
AND d_year = 2000
AND d_moy = 12
AND c_customer_sk = ss_customer_sk
AND s_store_sk = ss_store_sk
AND i_manager_id = 9
GROUP BY i_item_id,
i_brand,
i_category,
i_class,
i_manufact
ORDER BY i_item_id,
i_brand,
i_category,
i_class,
i_manufact
LIMIT 100
"""
}
total_start_time = time.time()
for query_name, query_sql in queries.items():
print(f"\nExecuting {query_name}...")
query_start = time.time()
# Execute query and force action using show()
result_df = spark.sql(query_sql)
result_df.show(5) # Show top 5 rows
query_end = time.time()
print(f"{query_name} completed in {query_end - query_start:.2f} seconds.")
total_end_time = time.time()
print(f"\nAll benchmark queries completed in {total_end_time - total_start_time:.2f} seconds.")
spark.stop()
if __name__ == "__main__":
main()
EOF
Copiez le script dans votre bucket Cloud Storage afin que Serverless pour Apache Spark puisse y accéder :
gcloud storage cp benchmark.py gs://${BUCKET_NAME}/scripts/benchmark.py
4. Exécuter le job Serverless de référence
Pour fournir une comparaison de référence sans Lightning Engine, envoyez le job de benchmark PySpark que vous avez importé précédemment au niveau Standard Serverless pour Apache Spark. Nous transmettrons le chemin d'accès à l'ensemble de données que vous avez copié comme argument.
Exécutez la commande suivante pour exécuter le job par lot :
gcloud dataproc batches submit pyspark \
gs://${BUCKET_NAME}/scripts/benchmark.py \
--region=${REGION} \
--version=2.3 \
--deps-bucket=gs://${BUCKET_NAME} \
-- ${DATASET_PATH}
Surveiller le job
Pendant l'exécution du job, vous verrez les journaux PySpark être diffusés en streaming dans votre terminal Cloud Shell. Serverless pour Apache Spark alloue des conteneurs, lit l'ensemble de données TPC-DS Parquet à partir de Cloud Storage et exécute les plans SQL complexes.
Une fois le script terminé, observez la sortie de la console. Vous devriez voir les résultats et les durées de chaque requête standard exécutée, comme suit :
... Executing Q1... +-------------+ |c_customer_id| +-------------+ ... Q1 completed in 18.52 seconds. ... All benchmark queries completed in 110.94 seconds.
Notez le nombre total de secondes nécessaires pour terminer l'opération. Il s'agit de votre durée d'exécution de référence.
5. Exécuter avec Serverless Premium et Lightning Engine
Ensuite, vous exécuterez exactement le même job Spark sur Serverless pour Apache Spark, mais en utilisant le niveau Premium et en activant le moteur de requête vectorisé natif de Google : Lightning Engine.
Envoyez le job de benchmark à Serverless avec Lightning Engine explicitement activé :
gcloud dataproc batches submit pyspark \
gs://${BUCKET_NAME}/scripts/benchmark.py \
--region=${REGION} \
--version=2.3 \
--deps-bucket=gs://${BUCKET_NAME} \
--properties="dataproc.tier=premium,spark.dataproc.lightningEngine.runtime=native" \
-- ${DATASET_PATH}
Comparer les résultats
Attendez la fin du job et examinez la sortie. Vous devriez voir les mêmes résultats de requête. Examinez attentivement le temps d'exécution :
... All benchmark queries completed in 64.24 seconds.
En comparant l'exécution Serverless de référence à l'exécution Serverless Lightning Engine, vous remarquerez que Lightning Engine exécute le regroupement, les agrégations et les jointures plus rapidement en utilisant une couche d'exécution C++ native et un traitement vectorisé sur le backend, sans nécessiter de modifications du code de votre application PySpark.
Lightning Engine est optimisé pour améliorer les performances à mesure que la charge de travail augmente. Dans cet exemple, nous utilisons un petit ensemble de données. L'amélioration des performances n'est donc pas aussi spectaculaire qu'elle pourrait l'être. Sur un ensemble de données de 10 To, les benchmarks ont montré une amélioration des performances jusqu'à 4,3 fois supérieure à celle de Spark Open Source.
6. Comparer les graphiques d'exécution dans l'UI Spark
La réduction du temps d'exécution est impressionnante, mais examinons en coulisses ce que Spark fait réellement lors de l'exécution des requêtes. Pour ce faire, examinez les graphiques d'exécution de l'UI Spark pour les deux jobs.
- Ouvrez la Google Cloud Console dans votre navigateur.
- Accédez à Dataproc > Lots.
- Deux lots s'affichent dans la liste : votre exécution de référence standard et votre exécution de niveau Premium.
- Cliquez sur le lot de niveau Premium que vous avez exécuté, puis sur Afficher l'UI Spark et enfin sur Afficher les détails.
- Dans l'UI Spark, accédez à l'onglet Jobs (Jobs).
- Sous Completed Jobs (Jobs terminés), dans le champ de recherche, saisissez
Velox. - Vous verrez de nombreuses descriptions de jobs qui incluent
VeloxSparkPlanExecApi. Cela fait référence au moteur d'exécution natif Velox utilisé par Lightning Engine.
Répétez maintenant ce processus pour l'exécution de niveau Standard :
- Revenez à la page "Lots" de Serverless pour Apache Spark.
- Cliquez sur le lien du lot de niveau Standard, puis sur Afficher l'UI Spark et enfin sur Afficher les détails.
- Dans l'UI Spark, accédez à l'onglet Jobs (Jobs).
- Sous Completed Jobs (Jobs terminés), dans le champ de recherche, saisissez
Velox. - Vous ne verrez aucune mention de l'API Velox dans les descriptions des jobs.
7. Effectuer un nettoyage
Pour éviter que votre compte Google Cloud ne soit facturé en permanence, supprimez les ressources créées lors de cet atelier de programmation.
Dans Cloud Shell, supprimez le bucket Cloud Storage et son contenu :
gcloud storage rm -r gs://${BUCKET_NAME}
Supprimez votre copie locale de benchmark.py :
rm benchmark.py
8. Félicitations
Félicitations ! Vous avez créé un environnement de benchmark pour Apache Spark et comparé Serverless pour Apache Spark Standard à Serverless pour Apache Spark Premium.
Vous avez vu directement comment l'activation du nouveau Lightning Engine de Serverless pour Apache Spark peut réduire le temps d'exécution de votre charge de travail Spark. Vous avez également exploré l'UI Spark pour voir comment le graphique d'exécution physique est transformé en code C++ natif à l'aide du moteur de requêtes natif.
Connaissances acquises
- Comment écrire un script de benchmark d'ensemble de données PySpark
- Comment envoyer des jobs Spark à Serverless pour Apache Spark
- Comment activer Lightning Engine
- Comment comparer les plans de jobs dans l'UI Spark
Étapes suivantes
- Explorer la documentation de Serverless pour Apache Spark
- Consulter l'outil de qualification de l'exécution de requêtes natives
- Consulter les requêtes de benchmark TPC-DS complètes sur GitHub