1. Introduzione - Google Dataproc
Dataproc è un servizio completamente gestito e a scalabilità elevata per l'esecuzione di Apache Spark, Apache Flink, Presto e molti altri strumenti e framework open source. Utilizza Dataproc per la modernizzazione dei data lake, l'ETL / ELT e la data science sicura su scala planetaria. Dataproc è anche completamente integrato con diversi servizi Google Cloud, tra cui BigQuery, Cloud Storage, Vertex AI e Dataplex.
Dataproc è disponibile in tre versioni:
- Dataproc Serverless consente di eseguire job PySpark senza dover configurare l'infrastruttura e la scalabilità automatica. Dataproc Serverless supporta le sessioni e i carichi di lavoro batch di PySpark.
- Dataproc su Google Compute Engine consente di gestire un cluster Hadoop YARN per carichi di lavoro Spark basati su YARN, oltre a strumenti open source come Flink e Presto. Puoi personalizzare i tuoi cluster basati su cloud con la scalabilità verticale o orizzontale che preferisci, inclusa la scalabilità automatica.
- Dataproc su Google Kubernetes Engine consente di configurare cluster virtuali Dataproc nella tua infrastruttura GKE per l'invio di job Spark, PySpark, SparkR o Spark SQL.
2. Crea un cluster Dataproc in un VPC Google Cloud
In questo passaggio, creerai un cluster Dataproc su Google Cloud utilizzando la console Google Cloud.
Come primo passaggio, abilita l'API del servizio Dataproc nella console. Una volta attivato, cerca "Dataproc" nella barra di ricerca e fai clic su Crea cluster.
Seleziona Cluster su Compute Engine per utilizzare le VM Google Compute Engine(GCE) come infrastruttura sottostante per eseguire i cluster Dataproc.

Ora ti trovi nella pagina di creazione del cluster.

In questa pagina:
- Fornisci un nome univoco per il cluster.
- Seleziona la regione specifica. Puoi anche selezionare una zona, ma Dataproc offre la possibilità di sceglierne una automaticamente. Per questo codelab, seleziona "us-central1" e "us-central1-c".
- Seleziona il tipo di cluster "Standard". In questo modo, è presente un solo nodo master.
- Nella scheda Configura nodi, verifica che il numero di worker creati sia due.
- Nella sezione Personalizza cluster, seleziona la casella accanto ad Attiva gateway dei componenti. In questo modo è possibile accedere alle interfacce web sul cluster, tra cui Spark UI, Yarn Node Manager e i notebook Jupyter.
- In Componenti facoltativi, seleziona Jupyter Notebook.. In questo modo il cluster viene configurato con un server di blocchi note Jupyter.
- Lascia invariato tutto il resto e fai clic su Crea cluster.
Verrà avviato un cluster Dataproc.
3. Avvia il cluster e connettiti tramite SSH
Quando lo stato del cluster cambia in In esecuzione, fai clic sul nome del cluster nella console Dataproc.

Fai clic sulla scheda Istanza VM per visualizzare il nodo master e i due nodi worker del cluster.

Fai clic su SSH accanto al nodo master per accedere al nodo master.

Esegui i comandi hdfs per visualizzare la struttura della directory.
hadoop_commands_example
sudo hadoop fs -ls /
sudo hadoop version
sudo hadoop fs -mkdir /test51
sudo hadoop fs -ls /
4. Interfacce web e gateway dei componenti
Dalla console del cluster Dataproc, fai clic sul nome del cluster, quindi sulla scheda INTERFACCE WEB.

Vengono visualizzate le interfacce web disponibili, tra cui Jupyter. Fai clic su Jupyter per aprire un notebook Jupyter. Puoi utilizzarlo per creare blocchi note in PySpark archiviati su GCS. per archiviare il blocco note su Google Cloud Storage e aprire un blocco note PySpark da utilizzare in questo codelab.
5. Monitorare e osservare i job Spark
Con il cluster Dataproc attivo e in esecuzione, crea un job batch PySpark e invialo al cluster Dataproc.
Crea un bucket Google Cloud Storage (GCS) per archiviare lo script PySpark. Assicurati di creare il bucket nella stessa regione del cluster Dataproc.

Ora che il bucket GCS è stato creato, copia il seguente file al suo interno.
https://raw.githubusercontent.com/diptimanr/spark-on-gce/main/test-spark-1.py
Questo script crea un DataFrame Spark di esempio e lo scrive come tabella Hive.
hive_job.py
from pyspark.sql import SparkSession
from datetime import datetime, date
from pyspark.sql import Row
spark = SparkSession.builder.master("local").enableHiveSupport().getOrCreate()
df = spark.createDataFrame([ (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
(2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)), (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
print("..... Writing data .....")
df.write.mode("overwrite").saveAsTable("test_table_1")
print("..... Complete .....")
Invia questo script come job batch Spark in Dataproc. Fai clic su Job nel menu di navigazione a sinistra, quindi su Invia job.

Fornisci un ID job e una regione. Seleziona il cluster e fornisci la posizione GCS dello script Spark che hai copiato. Questo job verrà eseguito come job batch Spark su Dataproc.
In Proprietà, aggiungi la chiave spark.submit.deployMode e il valore client per assicurarti che il driver venga eseguito nel nodo master Dataproc e non nei nodi worker. Fai clic su Invia per inviare il job batch a Dataproc.

Lo script Spark creerà un DataFrame e scriverà in una tabella Hive test_table_1.
Una volta eseguito correttamente il job, puoi visualizzare le istruzioni di stampa della console nella scheda Monitoraggio.

Ora che la tabella Hive è stata creata, invia un altro job di query Hive per selezionare i contenuti della tabella e visualizzarli nella console.
Crea un altro job con le seguenti proprietà:

Nota che il Tipo di job è impostato su Hive e il tipo di origine della query è Testo della query, il che significa che scriveremo l'intera istruzione HiveQL nella casella di testo Testo della query.
Invia il job mantenendo i valori predefiniti per gli altri parametri.

Nota come HiveQL seleziona tutti i record e li visualizza nella console.
6. Scalabilità automatica
La scalabilità automatica è l'attività di stima del numero "giusto" di nodi worker del cluster per un carico di lavoro.
L'API AutoscalingPolicies di Dataproc offre un meccanismo per automatizzare la gestione delle risorse cluster e consente la scalabilità automatica delle VM worker del cluster. Un criterio di scalabilità automatica è una configurazione riutilizzabile che descrive in che modo deve essere scalato il numero di worker del cluster che utilizzano il criterio di scalabilità automatica. Definisce i limiti, la frequenza e l'aggressività della scalabilità per fornire un controllo dettagliato sulle risorse del cluster nel corso del suo ciclo di vita.
Le norme di scalabilità automatica di Dataproc vengono scritte utilizzando file YAML e questi file YAML vengono passati nel comando CLI per la creazione del cluster o selezionati da un bucket GCS quando un cluster viene creato dalla console Cloud.
Ecco un esempio di criterio di scalabilità automatica Dataproc :
policy.yaml
workerConfig:
minInstances: 10
maxInstances: 10
secondaryWorkerConfig:
maxInstances: 50
basicAlgorithm:
cooldownPeriod: 4m
yarnConfig:
scaleUpFactor: 0.05
scaleDownFactor: 1.0
gracefulDecommissionTimeout: 1h
7. Configura i componenti facoltativi di Dataproc
Verrà avviato un cluster Dataproc.
Quando crei un cluster Dataproc, i componenti standard dell'ecosistema Apache Hadoop vengono installati automaticamente sul cluster (vedi Elenco delle versioni di Dataproc). Quando crei il cluster, puoi installare componenti aggiuntivi, chiamati componenti facoltativi.

Durante la creazione del cluster Dataproc dalla console, abbiamo abilitato i componenti opzionali e selezionato Jupyter Notebook come componente opzionale.
8. Libera spazio
Per liberare spazio nel cluster, fai clic su Arresta dopo aver selezionato il cluster dalla console Dataproc. Una volta arrestato il cluster, fai clic su Elimina per eliminarlo.
Dopo aver eliminato il cluster Dataproc, elimina i bucket GCS in cui è stato copiato il codice.
Per eseguire la pulizia delle risorse e interrompere qualsiasi fatturazione indesiderata, il cluster Dataproc deve essere prima arrestato e poi eliminato.
Prima di arrestare ed eliminare il cluster, assicurati che tutti i dati scritti nell'archivio HDFS vengano copiati in GCS per l'archiviazione durevole.
Per interrompere il cluster, fai clic su Interrompi.

Una volta arrestato il cluster, fai clic su Elimina per eliminarlo.
Nella finestra di dialogo di conferma, fai clic su Elimina per eliminare il cluster.
