1. Einführung
In diesem Codelab erfahren Sie mehr über die Leistungssteigerungen, die mit der nativen Ausführungs-Engine von Google Cloud Serverless for Apache Spark, der Lightning Engine , möglich sind. Außerdem sehen Sie, wie sie Ihre Spark-Arbeitslasten in Serverless for Apache Spark optimiert.
Die Lightning Engine verwendet Velox und Apache Gluten. Velox ist eine leistungsstarke C++-Engine für die Datenverarbeitung. Apache Gluten ist eine Zwischenschicht, die JVM-basierte Spark-Jobs in C++-Code umwandelt, der von Velox ausgeführt werden kann.
In dieser Demo wird TPC-DS verwendet, ein branchenüblicher Benchmark zur Bewertung der Leistung von Entscheidungsunterstützungssystemen. Sie senden einen PySpark-Baseline-Job, um ein TPC-DS-Beispiel-Dataset mit der Serverless-Stufe „Standard“ abzufragen. Anschließend führen Sie denselben Job mit der Stufe „Premium“ aus, wobei die Lightning Engine aktiviert ist. Zum Schluss vergleichen Sie die Ausführungszeit und sehen sich in der Spark-UI die Unterschiede in den hardwarebeschleunigten Spark-Ausführungsgraphen an.
Die geschätzten Kosten für dieses Codelab betragen weniger als 1,00$, vorausgesetzt, die Ressourcen werden umgehend bereinigt, wie im Abschnitt Bereinigen beschrieben.
Aufgaben
- Einen Cloud Storage-Bucket zum Speichern Ihrer Benchmark-Skripts und -Ergebnisse erstellen
- PySpark-Baseline-Job zur Datenverarbeitung mit der Serverless for Apache Spark-Stufe „Standard“ ausführen
- Denselben Job mit der Serverless for Apache Spark-Stufe „Premium“ und der Lightning Engine ausführen
- Laufzeitmesswerte vergleichen
- Spark-Verlaufsserver-UI starten, um die nativen physischen Ausführungsgraphen zu vergleichen
Voraussetzungen
- Ein Webbrowser wie Chrome
- Ein Google Cloud-Projekt mit aktivierter Abrechnung
- Grundkenntnisse zu Apache Spark und der Linux-Befehlszeile
2. Hinweis
Google Cloud-Projekt erstellen
- Wählen Sie in der Google Cloud Console auf der Seite der Projektauswahl ein Google Cloud-Projekt aus oder erstellen Sie eines.
- Die Abrechnung für das Cloud-Projekt muss aktiviert sein. Informationen zum Prüfen, ob die Abrechnung für ein Projekt aktiviert ist.
Cloud Shell starten
Die Cloud Shell ist eine Befehlszeilenumgebung, die in Google Cloud ausgeführt wird und in der die erforderlichen Tools vorinstalliert sind.
- Klicken Sie oben in der Google Cloud Console auf Cloud Shell aktivieren.
- Sobald die Verbindung mit der Cloud Shell hergestellt ist, prüfen Sie Ihre Authentifizierung:
gcloud auth list - Prüfen Sie, ob Ihr Projekt konfiguriert ist:
gcloud config get project - Wenn Ihr Projekt nicht wie erwartet festgelegt ist, legen Sie es fest:
export PROJECT_ID=<YOUR_PROJECT_ID> gcloud config set project $PROJECT_ID
APIs aktivieren
Führen Sie diesen Befehl aus, um alle erforderlichen APIs für dieses Codelab zu aktivieren:
gcloud services enable \
dataproc.googleapis.com \
storage.googleapis.com \
compute.googleapis.com
3. Umgebung vorbereiten
In diesem Schritt initialisieren Sie Umgebungsvariablen und erstellen einen Cloud Storage-Bucket. Dieser Bucket enthält das PySpark-Skript, das Sie an beide Serverless for Apache Spark-Stufen senden.
Umgebungsvariablen festlegen
Führen Sie die folgenden Befehle in der Cloud Shell aus, um Standardumgebungsvariablen festzulegen. Wir verwenden die Region us-central1, Sie können dies aber bei Bedarf ändern.
export PROJECT_ID=$(gcloud config get-value project)
export REGION="us-central1"
export BUCKET_NAME="spark-benchmark-${PROJECT_ID}-${REGION}"
gcloud config set dataproc/region ${REGION}
Cloud Storage-Bucket erstellen
Erstellen Sie den Bucket für Ihre Skripts und Logs:
gcloud storage buckets create gs://${BUCKET_NAME} \
--uniform-bucket-level-access \
--location=${REGION}
TPC-DS-Dataset in Ihren eigenen Bucket kopieren
In diesem Schritt kopieren Sie das TPC-DS-Dataset aus einem öffentlichen Bucket in Ihren eigenen Cloud Storage-Bucket. So können Ihre PySpark-Jobs Daten lokal aus Ihrem Projekt lesen.
Legen Sie Umgebungsvariablen fest, um die Dataset-Größe und den Dataset-Typ auszuwählen:
export DATASET_TYPE="partitioned" # Options: partitioned, nonpartitioned
export DATASET_SIZE="1GB" # Options: 1GB, 10GB, 100GB, 1000GB (1000GB not available for partitioned)
export SRC_PATH="gs://beam-tpcds/datasets/parquet/${DATASET_TYPE}/${DATASET_SIZE}"
export DATASET_PATH="gs://${BUCKET_NAME}/tpc-ds-dataset/${DATASET_TYPE}/${DATASET_SIZE}"
Kopieren Sie die TPC-DS-Daten in Ihren eigenen Bucket:
gcloud storage cp -r ${SRC_PATH}/* ${DATASET_PATH}/
PySpark-Benchmark-Skript erstellen
Wir verwenden ein PySpark-Skript, das die Standard-TPC-DS-Tabellen aus Ihrem Cloud Storage-Bucket registriert und fünf Standardabfragen aus dem öffentlichen Apache Spark-Repository ausführt. Das Skript akzeptiert den Pfad zu Ihrem Dataset als Argument.
Erstellen Sie in der Cloud Shell eine Datei mit dem Namen benchmark.py. Sie können den folgenden Befehl kopieren und einfügen, um die Datei zu erstellen:
cat << 'EOF' > benchmark.py
import argparse
import sys
from pyspark.sql import SparkSession
import time
def main():
parser = argparse.ArgumentParser(description='TPC-DS Benchmark')
parser.add_argument('data_path', help='GCS base path for TPC-DS tables')
args = parser.parse_args()
base_path = args.data_path
# Initialize Spark Session
spark = SparkSession.builder \
.appName("TPC-DS Benchmark") \
.getOrCreate()
print(f"Spark Session created. Registering TPC-DS tables from {base_path}...")
# List of all 24 TPC-DS tables
tables = [
"call_center", "catalog_page", "catalog_returns", "catalog_sales",
"customer", "customer_address", "customer_demographics", "date_dim",
"household_demographics", "income_band", "inventory", "item",
"promotion", "reason", "ship_mode", "store", "store_returns",
"store_sales", "time_dim", "warehouse", "web_page", "web_returns",
"web_sales", "web_site"
]
# Register each table as a temporary view
# For this subset of queries, not every table is used
for table in tables:
path = f"{base_path}/{table}"
try:
df = spark.read.parquet(path)
df.createOrReplaceTempView(table)
except Exception as e:
print(f"Warning: Could not load table {table} from {path}. Error: {e}")
print("Tables registered successfully. Starting benchmark queries from Apache Spark test suite...")
# Standard TPC-DS Queries sourced from Apache Spark public repository:
# https://github.com/apache/spark/tree/master/sql/core/src/test/resources/tpcds
queries = {
"Q1": """
WITH customer_total_return AS (
SELECT sr_customer_sk AS ctr_customer_sk,
sr_store_sk AS ctr_store_sk,
sum(sr_return_amt) AS ctr_total_return
FROM store_returns, date_dim
WHERE sr_returned_date_sk = d_date_sk
AND d_year = 2000
GROUP BY sr_customer_sk, sr_store_sk
)
SELECT c_customer_id
FROM customer_total_return ctr1, store, customer
WHERE ctr1.ctr_total_return > (
SELECT avg(ctr_total_return) * 1.2
FROM customer_total_return ctr2
WHERE ctr1.ctr_store_sk = ctr2.ctr_store_sk
)
AND s_store_sk = ctr1.ctr_store_sk
AND s_state = 'TN'
AND ctr1.ctr_customer_sk = c_customer_sk
ORDER BY c_customer_id
LIMIT 100
""",
"Q2": """
WITH wscs AS (
SELECT sold_date_sk, sales_price
FROM (
SELECT ws_sold_date_sk AS sold_date_sk, ws_ext_sales_price AS sales_price
FROM web_sales
UNION ALL
SELECT cs_sold_date_sk AS sold_date_sk, cs_ext_sales_price AS sales_price
FROM catalog_sales
)
),
wswscs AS (
SELECT d_week_seq,
sum(CASE WHEN (d_day_name='Sunday') THEN sales_price ELSE null END) AS sun_sales,
sum(CASE WHEN (d_day_name='Monday') THEN sales_price ELSE null END) AS mon_sales,
sum(CASE WHEN (d_day_name='Tuesday') THEN sales_price ELSE null END) AS tue_sales,
sum(CASE WHEN (d_day_name='Wednesday') THEN sales_price ELSE null END) AS wed_sales,
sum(CASE WHEN (d_day_name='Thursday') THEN sales_price ELSE null END) AS thu_sales,
sum(CASE WHEN (d_day_name='Friday') THEN sales_price ELSE null END) AS fri_sales,
sum(CASE WHEN (d_day_name='Saturday') THEN sales_price ELSE null END) AS sat_sales
FROM wscs, date_dim
WHERE d_date_sk = sold_date_sk
GROUP BY d_week_seq
)
SELECT d_week_seq1,
round(sun_sales1/sun_sales2, 2),
round(mon_sales1/mon_sales2, 2),
round(tue_sales1/tue_sales2, 2),
round(wed_sales1/wed_sales2, 2),
round(thu_sales1/thu_sales2, 2),
round(fri_sales1/fri_sales2, 2),
round(sat_sales1/sat_sales2, 2)
FROM (
SELECT wswscs.d_week_seq AS d_week_seq1,
sun_sales AS sun_sales1, mon_sales AS mon_sales1,
tue_sales AS tue_sales1, wed_sales AS wed_sales1,
thu_sales AS thu_sales1, fri_sales AS fri_sales1,
sat_sales AS sat_sales1
FROM wswscs, date_dim
WHERE date_dim.d_week_seq = wswscs.d_week_seq
AND d_year = 2001
) y,
(
SELECT wswscs.d_week_seq AS d_week_seq2,
sun_sales AS sun_sales2, mon_sales AS mon_sales2,
tue_sales AS tue_sales2, wed_sales AS wed_sales2,
thu_sales AS thu_sales2, fri_sales AS fri_sales2,
sat_sales AS sat_sales2
FROM wswscs, date_dim
WHERE date_dim.d_week_seq = wswscs.d_week_seq
AND d_year = 2001 + 1
) z
WHERE d_week_seq1 = d_week_seq2 - 53
ORDER BY d_week_seq1
""",
"Q3": """
SELECT dt.d_year,
item.i_brand_id AS brand_id,
item.i_brand AS brand,
sum(ss_ext_sales_price) AS sum_agg
FROM date_dim dt,
store_sales,
item
WHERE dt.d_date_sk = store_sales.ss_sold_date_sk
AND store_sales.ss_item_sk = item.i_item_sk
AND item.i_manufact_id = 436
AND dt.d_moy = 12
GROUP BY dt.d_year,
item.i_brand,
item.i_brand_id
ORDER BY dt.d_year,
sum_agg DESC,
brand_id
LIMIT 100
""",
"Q7": """
SELECT i_item_id,
avg(ss_quantity) AS agg1,
avg(ss_list_price) AS agg2,
avg(ss_coupon_amt) AS agg3,
avg(ss_sales_price) AS agg4
FROM store_sales,
customer_demographics,
date_dim,
item,
promotion
WHERE ss_sold_date_sk = d_date_sk
AND ss_item_sk = i_item_sk
AND ss_cdemo_sk = cd_demo_sk
AND ss_promo_sk = p_promo_sk
AND cd_gender = 'M'
AND cd_marital_status = 'S'
AND cd_education_status = 'College'
AND (p_channel_email = 'N' OR p_channel_event = 'N')
AND d_year = 2000
GROUP BY i_item_id
ORDER BY i_item_id
LIMIT 100
""",
"Q19": """
SELECT i_item_id,
i_brand,
i_category,
i_class,
i_manufact,
sum(ss_ext_sales_price) AS sales,
sum(ss_net_profit) AS profit
FROM date_dim,
store_sales,
item,
customer,
store
WHERE d_date_sk = ss_sold_date_sk
AND i_item_sk = ss_item_sk
AND d_year = 2000
AND d_moy = 12
AND c_customer_sk = ss_customer_sk
AND s_store_sk = ss_store_sk
AND i_manager_id = 9
GROUP BY i_item_id,
i_brand,
i_category,
i_class,
i_manufact
ORDER BY i_item_id,
i_brand,
i_category,
i_class,
i_manufact
LIMIT 100
"""
}
total_start_time = time.time()
for query_name, query_sql in queries.items():
print(f"\nExecuting {query_name}...")
query_start = time.time()
# Execute query and force action using show()
result_df = spark.sql(query_sql)
result_df.show(5) # Show top 5 rows
query_end = time.time()
print(f"{query_name} completed in {query_end - query_start:.2f} seconds.")
total_end_time = time.time()
print(f"\nAll benchmark queries completed in {total_end_time - total_start_time:.2f} seconds.")
spark.stop()
if __name__ == "__main__":
main()
EOF
Kopieren Sie das Skript in Ihren Cloud Storage-Bucket, damit Serverless for Apache Spark darauf zugreifen kann:
gcloud storage cp benchmark.py gs://${BUCKET_NAME}/scripts/benchmark.py
4. Serverless-Baseline-Job ausführen
Um einen Baseline-Vergleich ohne Lightning Engine zu ermöglichen, senden Sie den zuvor hochgeladenen PySpark-Benchmark-Job an die Serverless for Apache Spark-Stufe „Standard“. Wir übergeben den Pfad zum kopierten Dataset als Argument.
Führen Sie den folgenden Befehl aus, um den Batchjob auszuführen:
gcloud dataproc batches submit pyspark \
gs://${BUCKET_NAME}/scripts/benchmark.py \
--region=${REGION} \
--version=2.3 \
--deps-bucket=gs://${BUCKET_NAME} \
-- ${DATASET_PATH}
Job beobachten
Während der Job ausgeführt wird, werden PySpark-Logs in Ihr Cloud Shell-Terminal gestreamt. Serverless for Apache Spark weist Container zu, liest das TPC-DS-Parquet-Dataset aus Cloud Storage und führt die komplexen SQL-Pläne aus.
Sehen Sie sich nach Abschluss des Skripts die Konsolenausgabe an. Sie sollten Ergebnisse und Zeiten für jede ausgeführte Standardabfrage sehen, ähnlich wie hier:
... Executing Q1... +-------------+ |c_customer_id| +-------------+ ... Q1 completed in 18.52 seconds. ... All benchmark queries completed in 110.94 seconds.
Notieren Sie sich die Gesamtzeit in Sekunden. Das ist Ihre Baseline-Laufzeit.
5. Mit Serverless Premium und Lightning Engine ausführen
Als Nächstes führen Sie denselben Spark-Job in Serverless for Apache Spark aus, verwenden aber die Stufe „Premium“ und aktivieren die native, vektorisierte Abfrage-Engine von Google: die Lightning Engine.
Senden Sie den Benchmark-Job an Serverless, wobei die Lightning Engine explizit aktiviert ist:
gcloud dataproc batches submit pyspark \
gs://${BUCKET_NAME}/scripts/benchmark.py \
--region=${REGION} \
--version=2.3 \
--deps-bucket=gs://${BUCKET_NAME} \
--properties="dataproc.tier=premium,spark.dataproc.lightningEngine.runtime=native" \
-- ${DATASET_PATH}
Ergebnisse vergleichen
Warten Sie, bis der Job abgeschlossen ist, und sehen Sie sich die Ausgabe an. Sie sollten dieselben Abfrageergebnisse sehen. Sehen Sie sich die Abschlusszeit genau an:
... All benchmark queries completed in 64.24 seconds.
Wenn Sie die Baseline-Ausführung von Serverless mit der Ausführung von Serverless mit Lightning Engine vergleichen, werden Sie feststellen, dass die Lightning Engine die Gruppierung, Aggregationen und Joins schneller ausführt. Dazu werden eine native C++-Ausführungsschicht und die vektorisierte Verarbeitung im Backend verwendet, ohne dass Änderungen am PySpark-Anwendungscode erforderlich sind.
Die Lightning Engine ist für eine höhere Leistung optimiert, je größer die Arbeitslast ist. In diesem Beispiel verwenden wir ein kleines Dataset, daher ist die Leistungssteigerung nicht so drastisch wie möglich. Bei einem 10-TB-Dataset wurde in Benchmarks eine bis zu 4, 3-mal höhere Leistung im Vergleich zu Open-Source-Spark nachgewiesen.
6. Ausführungsgraphen in der Spark-UI vergleichen
Die Reduzierung der Laufzeit ist beeindruckend, aber sehen wir uns genauer an , was Spark während der Abfrageausführung tatsächlich tut. Dazu können Sie die Ausführungsgraphen der Spark-UI für beide Jobs untersuchen.
- Öffnen Sie die Google Cloud Console in Ihrem Browser.
- Rufen Sie Dataproc > Batches auf.
- In der Liste sehen Sie zwei Batches: Ihre Standard-Baseline-Ausführung und Ihre Ausführung mit der Stufe „Premium“.
- Klicken Sie auf den Batch der Stufe „Premium“, den Sie ausgeführt haben, dann auf Spark-UI ansehen und dann auf Details ansehen.
- Rufen Sie in der Spark-UI den Tab Jobs auf.
- Geben Sie unter Abgeschlossene Jobs im Suchfeld
Veloxein. - Sie sehen viele Jobbeschreibungen, die
VeloxSparkPlanExecApienthalten. Dies bezieht sich auf die native Ausführungs-Engine von Velox, die von der Lightning Engine verwendet wird.
Wiederholen Sie diesen Vorgang für die Ausführung mit der Stufe „Standard“:
- Kehren Sie zur Seite „Serverless for Apache Spark-Batches“ zurück.
- Klicken Sie auf den Link für den Batch der Stufe „Standard“, dann auf Spark-UI ansehen und dann auf Details ansehen.
- Rufen Sie in der Spark-UI den Tab Jobs auf.
- Geben Sie unter Abgeschlossene Jobs im Suchfeld
Veloxein. - In den Jobbeschreibungen wird die Velox API nicht erwähnt.
7. Bereinigen
Löschen Sie die in diesem Codelab erstellten Ressourcen, um laufende Kosten für Ihr Google Cloud-Konto zu vermeiden.
Löschen Sie in der Cloud Shell den Cloud Storage-Bucket und seinen Inhalt:
gcloud storage rm -r gs://${BUCKET_NAME}
Löschen Sie Ihre lokale Kopie von benchmark.py:
rm benchmark.py
8. Glückwunsch
Glückwunsch! Sie haben erfolgreich eine Benchmark-Umgebung für Apache Spark erstellt und Serverless for Apache Spark Standard mit Serverless for Apache Spark Premium verglichen.
Sie haben aus erster Hand gesehen, wie die neue Lightning Engine von Serverless for Apache Spark die Laufzeit Ihrer Spark-Arbeitslast reduzieren kann. Außerdem haben Sie in der Spark-UI gesehen, wie der physische Ausführungsgraph mit der Native Query Engine in nativen C++-Code umgewandelt wird.
Gelernte Inhalte
- PySpark-Skript zum Benchmarking von Datasets schreiben
- Spark-Jobs an Serverless for Apache Spark senden
- Lightning Engine aktivieren
- Jobpläne in der Spark-UI vergleichen
Nächste Schritte
- Dokumentation zu Serverless for Apache Spark
- Siehe das Qualifizierungstool für die native Abfrageausführung
- Vollständige TPC-DS-Benchmark-Abfragen auf GitHub