Accélérer Spark avec Serverless pour Apache Spark et Lightning Engine

1. Introduction

Dans cet atelier de programmation, vous allez découvrir les avantages en termes de performances du moteur d'exécution natif de Google Cloud Serverless pour Apache Spark, Lightning Engine, et examiner comment il optimise vos charges de travail Spark sur Serverless pour Apache Spark.

Lightning Engine utilise Velox et Apache Gluten. Velox est un moteur C++ hautes performances pour le traitement des données. Apache Gluten est une couche intermédiaire chargée de convertir les jobs Spark basés sur JVM en code C++ pouvant être exécuté par Velox.

Cette démonstration utilise TPC-DS, un benchmark standard conçu pour évaluer les performances des systèmes d'aide à la décision. Vous allez envoyer un job PySpark de référence pour interroger un exemple d'ensemble de données TPC-DS à l'aide du niveau Standard Serverless. Ensuite, vous exécuterez exactement le même job à l'aide du niveau Premium avec Lightning Engine activé. Enfin, vous comparerez le temps d'exécution et explorerez l'UI Spark pour visualiser la différence dans les graphiques d'exécution Spark accélérés par le matériel.

Le coût estimé de cet atelier de programmation est inférieur à 1,00 USD, en supposant que les ressources sont nettoyées rapidement, comme décrit dans la section Effectuer un nettoyage.

Objectifs de l'atelier

  • Créer un Cloud Storage bucket pour stocker vos scripts et résultats de benchmark
  • Exécuter un job de traitement de données PySpark de référence à l'aide du niveau Standard Serverless pour Apache Spark
  • Exécuter le même job à l'aide du niveau Premium Serverless pour Apache Spark avec Lightning Engine
  • Comparer les métriques d'exécution
  • Lancer l'UI du serveur d'historique Spark pour comparer les graphiques d'exécution physique natifs

Ce dont vous avez besoin

2. Avant de commencer

Créer un projet Google Cloud

  1. Dans la console Google Cloud, sur la page du sélecteur de projet, sélectionnez ou créez un projet Google Cloud.
  2. Assurez-vous que la facturation est activée pour votre projet Cloud. Découvrez comment vérifier si la facturation est activée pour un projet.

Démarrer Cloud Shell

Cloud Shell est un environnement de ligne de commande exécuté dans Google Cloud, qui est préchargé avec les outils nécessaires.

  1. Cliquez sur Activer Cloud Shell en haut de la console Google Cloud.
  2. Une fois connecté à Cloud Shell, vérifiez votre authentification :
    gcloud auth list
    
  3. Vérifiez que votre projet est configuré :
    gcloud config get project
    
  4. Si votre projet n'est pas défini comme prévu, définissez-le :
    export PROJECT_ID=<YOUR_PROJECT_ID>
    gcloud config set project $PROJECT_ID
    

Activer les API

Exécutez cette commande pour activer toutes les API requises pour cet atelier de programmation :

gcloud services enable \
    dataproc.googleapis.com \
    storage.googleapis.com \
    compute.googleapis.com

3. Préparer votre environnement

Dans cette étape, vous allez initialiser les variables d'environnement et créer un bucket Cloud Storage. Ce bucket contiendra le script PySpark que vous enverrez aux deux niveaux Serverless pour Apache Spark.

Définir des variables d'environnement

Exécutez les commandes suivantes dans Cloud Shell pour définir les variables d'environnement par défaut. Nous utiliserons la région us-central1, mais vous pouvez la modifier si vous le souhaitez.

export PROJECT_ID=$(gcloud config get-value project)
export REGION="us-central1"
export BUCKET_NAME="spark-benchmark-${PROJECT_ID}-${REGION}"

gcloud config set dataproc/region ${REGION}

Créer un bucket Cloud Storage

Créez le bucket pour stocker vos scripts et journaux :

gcloud storage buckets create gs://${BUCKET_NAME} \
    --uniform-bucket-level-access \
    --location=${REGION}

Copier l'ensemble de données TPC-DS dans votre propre bucket

Dans cette étape, vous allez copier l'ensemble de données TPC-DS d'un bucket public vers votre propre bucket Cloud Storage. Ainsi, vos jobs PySpark pourront lire les données localement à partir de votre projet.

Définissez les variables d'environnement pour choisir la taille et le type de l'ensemble de données :

export DATASET_TYPE="partitioned" # Options: partitioned, nonpartitioned
export DATASET_SIZE="1GB"         # Options: 1GB, 10GB, 100GB, 1000GB (1000GB not available for partitioned)

export SRC_PATH="gs://beam-tpcds/datasets/parquet/${DATASET_TYPE}/${DATASET_SIZE}"
export DATASET_PATH="gs://${BUCKET_NAME}/tpc-ds-dataset/${DATASET_TYPE}/${DATASET_SIZE}"

Copiez les données TPC-DS dans votre propre bucket :

gcloud storage cp -r ${SRC_PATH}/* ${DATASET_PATH}/

Créer le script de benchmark PySpark

Nous allons utiliser un script PySpark qui enregistre les tables TPC-DS standards de votre bucket Cloud Storage et exécute cinq requêtes standards provenant du dépôt public Apache Spark. Le script accepte le chemin d'accès à votre ensemble de données comme argument.

Créez un fichier nommé benchmark.py dans Cloud Shell. Vous pouvez copier et coller la commande suivante pour générer le fichier :

cat << 'EOF' > benchmark.py
import argparse
import sys
from pyspark.sql import SparkSession
import time

def main():
    parser = argparse.ArgumentParser(description='TPC-DS Benchmark')
    parser.add_argument('data_path', help='GCS base path for TPC-DS tables')
    args = parser.parse_args()

    base_path = args.data_path

    # Initialize Spark Session
    spark = SparkSession.builder \
        .appName("TPC-DS Benchmark") \
        .getOrCreate()

    print(f"Spark Session created. Registering TPC-DS tables from {base_path}...")

    # List of all 24 TPC-DS tables
    tables = [
        "call_center", "catalog_page", "catalog_returns", "catalog_sales",
        "customer", "customer_address", "customer_demographics", "date_dim",
        "household_demographics", "income_band", "inventory", "item",
        "promotion", "reason", "ship_mode", "store", "store_returns",
        "store_sales", "time_dim", "warehouse", "web_page", "web_returns",
        "web_sales", "web_site"
    ]

    # Register each table as a temporary view
    # For this subset of queries, not every table is used
    for table in tables:
        path = f"{base_path}/{table}"
        try:
            df = spark.read.parquet(path)
            df.createOrReplaceTempView(table)
        except Exception as e:
            print(f"Warning: Could not load table {table} from {path}. Error: {e}")

    print("Tables registered successfully. Starting benchmark queries from Apache Spark test suite...")

    # Standard TPC-DS Queries sourced from Apache Spark public repository:
    # https://github.com/apache/spark/tree/master/sql/core/src/test/resources/tpcds
    queries = {
        "Q1": """
            WITH customer_total_return AS (
              SELECT sr_customer_sk AS ctr_customer_sk,
                     sr_store_sk AS ctr_store_sk,
                     sum(sr_return_amt) AS ctr_total_return
              FROM store_returns, date_dim
              WHERE sr_returned_date_sk = d_date_sk
                AND d_year = 2000
              GROUP BY sr_customer_sk, sr_store_sk
            )
            SELECT c_customer_id
            FROM customer_total_return ctr1, store, customer
            WHERE ctr1.ctr_total_return > (
              SELECT avg(ctr_total_return) * 1.2
              FROM customer_total_return ctr2
              WHERE ctr1.ctr_store_sk = ctr2.ctr_store_sk
            )
              AND s_store_sk = ctr1.ctr_store_sk
              AND s_state = 'TN'
              AND ctr1.ctr_customer_sk = c_customer_sk
            ORDER BY c_customer_id
            LIMIT 100
        """,
        "Q2": """
            WITH wscs AS (
              SELECT sold_date_sk, sales_price
              FROM (
                SELECT ws_sold_date_sk AS sold_date_sk, ws_ext_sales_price AS sales_price
                FROM web_sales
                UNION ALL
                SELECT cs_sold_date_sk AS sold_date_sk, cs_ext_sales_price AS sales_price
                FROM catalog_sales
              )
            ),
            wswscs AS (
              SELECT d_week_seq,
                     sum(CASE WHEN (d_day_name='Sunday') THEN sales_price ELSE null END) AS sun_sales,
                     sum(CASE WHEN (d_day_name='Monday') THEN sales_price ELSE null END) AS mon_sales,
                     sum(CASE WHEN (d_day_name='Tuesday') THEN sales_price ELSE null END) AS tue_sales,
                     sum(CASE WHEN (d_day_name='Wednesday') THEN sales_price ELSE null END) AS wed_sales,
                     sum(CASE WHEN (d_day_name='Thursday') THEN sales_price ELSE null END) AS thu_sales,
                     sum(CASE WHEN (d_day_name='Friday') THEN sales_price ELSE null END) AS fri_sales,
                     sum(CASE WHEN (d_day_name='Saturday') THEN sales_price ELSE null END) AS sat_sales
              FROM wscs, date_dim
              WHERE d_date_sk = sold_date_sk
              GROUP BY d_week_seq
            )
            SELECT d_week_seq1,
                   round(sun_sales1/sun_sales2, 2),
                   round(mon_sales1/mon_sales2, 2),
                   round(tue_sales1/tue_sales2, 2),
                   round(wed_sales1/wed_sales2, 2),
                   round(thu_sales1/thu_sales2, 2),
                   round(fri_sales1/fri_sales2, 2),
                   round(sat_sales1/sat_sales2, 2)
            FROM (
              SELECT wswscs.d_week_seq AS d_week_seq1,
                     sun_sales AS sun_sales1, mon_sales AS mon_sales1,
                     tue_sales AS tue_sales1, wed_sales AS wed_sales1,
                     thu_sales AS thu_sales1, fri_sales AS fri_sales1,
                     sat_sales AS sat_sales1
              FROM wswscs, date_dim
              WHERE date_dim.d_week_seq = wswscs.d_week_seq
                AND d_year = 2001
            ) y,
            (
              SELECT wswscs.d_week_seq AS d_week_seq2,
                     sun_sales AS sun_sales2, mon_sales AS mon_sales2,
                     tue_sales AS tue_sales2, wed_sales AS wed_sales2,
                     thu_sales AS thu_sales2, fri_sales AS fri_sales2,
                     sat_sales AS sat_sales2
              FROM wswscs, date_dim
              WHERE date_dim.d_week_seq = wswscs.d_week_seq
                AND d_year = 2001 + 1
            ) z
            WHERE d_week_seq1 = d_week_seq2 - 53
            ORDER BY d_week_seq1
        """,
        "Q3": """
            SELECT dt.d_year,
                   item.i_brand_id AS brand_id,
                   item.i_brand AS brand,
                   sum(ss_ext_sales_price) AS sum_agg
            FROM date_dim dt,
                 store_sales,
                 item
            WHERE dt.d_date_sk = store_sales.ss_sold_date_sk
              AND store_sales.ss_item_sk = item.i_item_sk
              AND item.i_manufact_id = 436
              AND dt.d_moy = 12
            GROUP BY dt.d_year,
                     item.i_brand,
                     item.i_brand_id
            ORDER BY dt.d_year,
                     sum_agg DESC,
                     brand_id
            LIMIT 100
        """,
        "Q7": """
            SELECT i_item_id,
                   avg(ss_quantity) AS agg1,
                   avg(ss_list_price) AS agg2,
                   avg(ss_coupon_amt) AS agg3,
                   avg(ss_sales_price) AS agg4
            FROM store_sales,
                 customer_demographics,
                 date_dim,
                 item,
                 promotion
            WHERE ss_sold_date_sk = d_date_sk
              AND ss_item_sk = i_item_sk
              AND ss_cdemo_sk = cd_demo_sk
              AND ss_promo_sk = p_promo_sk
              AND cd_gender = 'M'
              AND cd_marital_status = 'S'
              AND cd_education_status = 'College'
              AND (p_channel_email = 'N' OR p_channel_event = 'N')
              AND d_year = 2000
            GROUP BY i_item_id
            ORDER BY i_item_id
            LIMIT 100
        """,
        "Q19": """
            SELECT i_item_id,
                   i_brand,
                   i_category,
                   i_class,
                   i_manufact,
                   sum(ss_ext_sales_price) AS sales,
                   sum(ss_net_profit) AS profit
            FROM date_dim,
                 store_sales,
                 item,
                 customer,
                 store
            WHERE d_date_sk = ss_sold_date_sk
              AND i_item_sk = ss_item_sk
              AND d_year = 2000
              AND d_moy = 12
              AND c_customer_sk = ss_customer_sk
              AND s_store_sk = ss_store_sk
              AND i_manager_id = 9
            GROUP BY i_item_id,
                     i_brand,
                     i_category,
                     i_class,
                     i_manufact
            ORDER BY i_item_id,
                     i_brand,
                     i_category,
                     i_class,
                     i_manufact
            LIMIT 100
        """
    }

    total_start_time = time.time()

    for query_name, query_sql in queries.items():
        print(f"\nExecuting {query_name}...")
        query_start = time.time()

        # Execute query and force action using show()
        result_df = spark.sql(query_sql)
        result_df.show(5) # Show top 5 rows

        query_end = time.time()
        print(f"{query_name} completed in {query_end - query_start:.2f} seconds.")

    total_end_time = time.time()
    print(f"\nAll benchmark queries completed in {total_end_time - total_start_time:.2f} seconds.")

    spark.stop()

if __name__ == "__main__":
    main()
EOF

Copiez le script dans votre bucket Cloud Storage afin que Serverless pour Apache Spark puisse y accéder :

gcloud storage cp benchmark.py gs://${BUCKET_NAME}/scripts/benchmark.py

4. Exécuter le job Serverless de référence

Pour fournir une comparaison de référence sans Lightning Engine, envoyez le job de benchmark PySpark que vous avez importé précédemment au niveau Standard Serverless pour Apache Spark. Nous transmettrons le chemin d'accès à l'ensemble de données que vous avez copié comme argument.

Exécutez la commande suivante pour exécuter le job par lot :

gcloud dataproc batches submit pyspark \
    gs://${BUCKET_NAME}/scripts/benchmark.py \
    --region=${REGION} \
    --version=2.3 \
    --deps-bucket=gs://${BUCKET_NAME} \
    -- ${DATASET_PATH}

Surveiller le job

Pendant l'exécution du job, vous verrez les journaux PySpark être diffusés en streaming dans votre terminal Cloud Shell. Serverless pour Apache Spark alloue des conteneurs, lit l'ensemble de données TPC-DS Parquet à partir de Cloud Storage et exécute les plans SQL complexes.

Une fois le script terminé, observez la sortie de la console. Vous devriez voir les résultats et les durées de chaque requête standard exécutée, comme suit :

...
Executing Q1...
+-------------+
|c_customer_id|
+-------------+
...

Q1 completed in 18.52 seconds.
...

All benchmark queries completed in 110.94 seconds.

Notez le nombre total de secondes nécessaires pour terminer l'opération. Il s'agit de votre durée d'exécution de référence.

5. Exécuter avec Serverless Premium et Lightning Engine

Ensuite, vous exécuterez exactement le même job Spark sur Serverless pour Apache Spark, mais en utilisant le niveau Premium et en activant le moteur de requête vectorisé natif de Google : Lightning Engine.

Envoyez le job de benchmark à Serverless avec Lightning Engine explicitement activé :

gcloud dataproc batches submit pyspark \
    gs://${BUCKET_NAME}/scripts/benchmark.py \
    --region=${REGION} \
    --version=2.3 \
    --deps-bucket=gs://${BUCKET_NAME} \
    --properties="dataproc.tier=premium,spark.dataproc.lightningEngine.runtime=native" \
    -- ${DATASET_PATH}

Comparer les résultats

Attendez la fin du job et examinez la sortie. Vous devriez voir les mêmes résultats de requête. Examinez attentivement le temps d'exécution :

...
All benchmark queries completed in 64.24 seconds.

En comparant l'exécution Serverless de référence à l'exécution Serverless Lightning Engine, vous remarquerez que Lightning Engine exécute le regroupement, les agrégations et les jointures plus rapidement en utilisant une couche d'exécution C++ native et un traitement vectorisé sur le backend, sans nécessiter de modifications du code de votre application PySpark.

Lightning Engine est optimisé pour améliorer les performances à mesure que la charge de travail augmente. Dans cet exemple, nous utilisons un petit ensemble de données. L'amélioration des performances n'est donc pas aussi spectaculaire qu'elle pourrait l'être. Sur un ensemble de données de 10 To, les benchmarks ont montré une amélioration des performances jusqu'à 4,3 fois supérieure à celle de Spark Open Source.

6. Comparer les graphiques d'exécution dans l'UI Spark

La réduction du temps d'exécution est impressionnante, mais examinons en coulisses ce que Spark fait réellement lors de l'exécution des requêtes. Pour ce faire, examinez les graphiques d'exécution de l'UI Spark pour les deux jobs.

  1. Ouvrez la Google Cloud Console dans votre navigateur.
  2. Accédez à Dataproc > Lots.
  3. Deux lots s'affichent dans la liste : votre exécution de référence standard et votre exécution de niveau Premium.
  4. Cliquez sur le lot de niveau Premium que vous avez exécuté, puis sur Afficher l'UI Spark et enfin sur Afficher les détails.
  5. Dans l'UI Spark, accédez à l'onglet Jobs (Jobs).
  6. Sous Completed Jobs (Jobs terminés), dans le champ de recherche, saisissez Velox.
  7. Vous verrez de nombreuses descriptions de jobs qui incluent VeloxSparkPlanExecApi. Cela fait référence au moteur d'exécution natif Velox utilisé par Lightning Engine.

Répétez maintenant ce processus pour l'exécution de niveau Standard :

  1. Revenez à la page "Lots" de Serverless pour Apache Spark.
  2. Cliquez sur le lien du lot de niveau Standard, puis sur Afficher l'UI Spark et enfin sur Afficher les détails.
  3. Dans l'UI Spark, accédez à l'onglet Jobs (Jobs).
  4. Sous Completed Jobs (Jobs terminés), dans le champ de recherche, saisissez Velox.
  5. Vous ne verrez aucune mention de l'API Velox dans les descriptions des jobs.

7. Effectuer un nettoyage

Pour éviter que votre compte Google Cloud ne soit facturé en permanence, supprimez les ressources créées lors de cet atelier de programmation.

Dans Cloud Shell, supprimez le bucket Cloud Storage et son contenu :

gcloud storage rm -r gs://${BUCKET_NAME}

Supprimez votre copie locale de benchmark.py :

rm benchmark.py

8. Félicitations

Félicitations ! Vous avez créé un environnement de benchmark pour Apache Spark et comparé Serverless pour Apache Spark Standard à Serverless pour Apache Spark Premium.

Vous avez vu directement comment l'activation du nouveau Lightning Engine de Serverless pour Apache Spark peut réduire le temps d'exécution de votre charge de travail Spark. Vous avez également exploré l'UI Spark pour voir comment le graphique d'exécution physique est transformé en code C++ natif à l'aide du moteur de requêtes natif.

Connaissances acquises

  • Comment écrire un script de benchmark d'ensemble de données PySpark
  • Comment envoyer des jobs Spark à Serverless pour Apache Spark
  • Comment activer Lightning Engine
  • Comment comparer les plans de jobs dans l'UI Spark

Étapes suivantes