1. Wprowadzenie do Google Dataproc
Dataproc to usługa w pełni zarządzana i wysoce skalowalna do uruchamiania Apache Spark, Apache Flink, Presto oraz wielu innych narzędzi i platform open source. Możesz wykorzystać Dataproc do modernizacji jezior danych, realizacji procesów ETL / ELT i bezpiecznego badania danych na globalną skalę. Dataproc jest też w pełni zintegrowany z kilkoma usługami Google Cloud, takimi jak BigQuery, Cloud Storage, Vertex AI i Dataplex.
Usługa Dataproc jest dostępna w 3 wariantach:
- Dataproc Serverless umożliwia uruchamianie zadań PySpark bez konieczności konfigurowania infrastruktury i autoskalowania. Dataproc Serverless obsługuje zadania wsadowe i sesje / notatniki PySpark.
- Dataproc w Google Compute Engine umożliwia zarządzanie klastrem Hadoop YARN dla zadań Spark opartych na YARN, a także narzędzi open source, takich jak Flink i Presto. Możesz dostosować klastry działające w chmurze za pomocą dowolnego skalowania w pionie lub w poziomie, w tym autoskalowania.
- Dataproc w Google Kubernetes Engine umożliwia konfigurowanie wirtualnych klastrów Dataproc w infrastrukturze GKE na potrzeby przesyłania zadań Spark, PySpark, SparkR i Spark SQL.
2. Tworzenie klastra Dataproc w sieci VPC Google Cloud
W tym kroku utworzysz klaster Dataproc w Google Cloud za pomocą konsoli Google Cloud.
Najpierw włącz interfejs API usługi Dataproc w konsoli. Po włączeniu tej usługi wyszukaj „Dataproc” na pasku wyszukiwania i kliknij Utwórz klaster.
Aby używać maszyn wirtualnych Google Compute Engine(GCE) jako infrastruktury bazowej do uruchamiania klastrów Dataproc, wybierz Klaster w Compute Engine.

Wyświetli się strona tworzenia klastra.

Na tej stronie:
- Nadaj klastrowi niepowtarzalną nazwę.
- Wybierz konkretny region. Możesz też wybrać strefę, ale Dataproc może ją wybrać automatycznie. Na potrzeby tego ćwiczenia wybierz „us-central1” i „us-central1-c”.
- Wybierz typ klastra „Standardowy”. Dzięki temu będzie tylko jeden węzeł główny.
- Na karcie Skonfiguruj węzły sprawdź, czy utworzone zostaną 2 instancje robocze.
- W sekcji Dostosuj klaster zaznacz pole obok opcji Włącz bramę komponentów. Umożliwia to dostęp do interfejsów internetowych w klastrze, w tym interfejsu Spark, menedżera węzłów Yarn i notatników Jupyter.
- W sekcji Komponenty opcjonalne wybierz Jupyter Notebook. Spowoduje to skonfigurowanie klastra z serwerem notatników Jupyter.
- Pozostaw inne ustawienia bez zmian i kliknij Utwórz klaster.
Spowoduje to uruchomienie klastra Dataproc.
3. Uruchamianie klastra i łączenie się z nim przez SSH
Gdy stan klastra zmieni się na Działający, kliknij nazwę klastra w konsoli Dataproc.

Kliknij kartę Instancja maszyny wirtualnej, aby wyświetlić węzeł główny i 2 węzły robocze klastra.

Aby zalogować się na węzeł główny, kliknij SSH obok węzła głównego.

Uruchom polecenia hdfs, aby wyświetlić strukturę katalogu.
hadoop_commands_example
sudo hadoop fs -ls /
sudo hadoop version
sudo hadoop fs -mkdir /test51
sudo hadoop fs -ls /
4. Interfejsy internetowe i bramy komponentów
W konsoli klastra Dataproc kliknij nazwę klastra, a następnie kliknij kartę INTERFEJSY WEB.

Wyświetla dostępne interfejsy internetowe, w tym Jupyter. Kliknij Jupyter, aby otworzyć notatnik Jupyter. Możesz użyć tej funkcji do tworzenia notatników w PySpark przechowywanych w GCS. Aby zapisać notatnik w Google Cloud Storage i otworzyć notatnik PySpark do użycia w tym ćwiczeniu, wykonaj te czynności.
5. Monitorowanie i obserwowanie zadań Spark
Po uruchomieniu klastra Dataproc utwórz zadanie wsadowe PySpark i prześlij je do klastra Dataproc.
Utwórz zasobnik Google Cloud Storage (GCS), w którym będzie przechowywany skrypt PySpark. Zadbaj o to, aby utworzyć zasobnik w tym samym regionie co klaster Dataproc.

Teraz, gdy zasobnik GCS został utworzony, skopiuj do niego ten plik.
https://raw.githubusercontent.com/diptimanr/spark-on-gce/main/test-spark-1.py
Ten skrypt tworzy przykładowy obiekt Spark DataFrame i zapisuje go jako tabelę Hive.
hive_job.py
from pyspark.sql import SparkSession
from datetime import datetime, date
from pyspark.sql import Row
spark = SparkSession.builder.master("local").enableHiveSupport().getOrCreate()
df = spark.createDataFrame([ (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
(2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)), (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
print("..... Writing data .....")
df.write.mode("overwrite").saveAsTable("test_table_1")
print("..... Complete .....")
Prześlij ten skrypt jako zadanie wsadowe Spark w Dataproc. W menu nawigacyjnym po lewej stronie kliknij Zadania, a następnie Prześlij zadanie.

Podaj Identyfikator zadania i region. Wybierz klaster i podaj lokalizację w GCS skopiowanego skryptu Spark. To zadanie zostanie uruchomione jako zadanie wsadowe Spark w Dataproc.
W sekcji Properties (Właściwości) dodaj klucz spark.submit.deployMode i wartość client, aby sterownik działał w węźle głównym Dataproc, a nie w węzłach instancji roboczych. Kliknij Prześlij, aby przesłać zadanie wsadowe do Dataproc.

Skrypt Spark utworzy ramkę danych i zapisze ją w tabeli Hivetest_table_1.
Po pomyślnym uruchomieniu zadania możesz wyświetlić instrukcje drukowania w konsoli na karcie Monitorowanie.

Po utworzeniu tabeli Hive prześlij kolejne zadanie zapytania Hive, aby wybrać zawartość tabeli i wyświetlić ją w konsoli.
Utwórz kolejne zadanie o tych właściwościach:

Zwróć uwagę, że Typ zadania jest ustawiony na Hive, a typ źródła zapytania to Tekst zapytania, co oznacza, że całą instrukcję HiveQL wpiszemy w polu tekstowym Tekst zapytania.
Prześlij zadanie, pozostawiając pozostałe parametry jako domyślne.

Zwróć uwagę, jak HiveQL wybiera wszystkie rekordy i wyświetla je w konsoli.
6. Autoskalowanie
Autoskalowanie to zadanie polegające na oszacowaniu „odpowiedniej” liczby węzłów roboczych klastra dla danego zadania.
Interfejs Dataproc AutoscalingPolicies API zapewnia mechanizm automatyzacji zarządzania zasobami klastra i umożliwia autoskalowanie instancji roboczych klastra. Zasada autoskalowania to konfiguracja wielokrotnego użytku, która opisuje sposób, w jaki powinny być skalowane węzły robocze klastra, w których stosowana jest dana zasada autoskalowania. Definiuje granice, częstotliwość i agresywność skalowania, zapewniając szczegółową kontrolę nad zasobami klastra od początku jego istnienia.
Zasady autoskalowania Dataproc są zapisywane w plikach YAML, które są przekazywane w poleceniu interfejsu wiersza poleceń służącym do tworzenia klastra lub wybierane z zasobnika GCS podczas tworzenia klastra w konsoli Cloud.
Oto przykład zasad autoskalowania Dataproc :
policy.yaml
workerConfig:
minInstances: 10
maxInstances: 10
secondaryWorkerConfig:
maxInstances: 50
basicAlgorithm:
cooldownPeriod: 4m
yarnConfig:
scaleUpFactor: 0.05
scaleDownFactor: 1.0
gracefulDecommissionTimeout: 1h
7. Konfigurowanie opcjonalnych komponentów Dataproc
Spowoduje to uruchomienie klastra Dataproc.
Gdy tworzysz klaster Dataproc, standardowe komponenty ekosystemu Apache Hadoop są automatycznie instalowane w klastrze (patrz Lista wersji Dataproc). Podczas tworzenia klastra możesz zainstalować w nim dodatkowe komponenty, zwane komponentami opcjonalnymi.

Podczas tworzenia klastra Dataproc w konsoli włączyliśmy komponenty opcjonalne i wybraliśmy Jupyter Notebook jako komponent opcjonalny.
8. Zwalnianie miejsca
Aby wyczyścić klaster, kliknij Zatrzymaj po wybraniu klastra w konsoli Dataproc. Gdy klaster się zatrzyma, kliknij Usuń, aby go usunąć.
Po usunięciu klastra Dataproc usuń zasobniki GCS, do których skopiowano kod.
Aby usunąć zasoby i zatrzymać niechciane naliczanie opłat, musisz najpierw zatrzymać, a potem usunąć klaster Dataproc.
Zanim zatrzymasz i usuniesz klaster, upewnij się, że wszystkie dane zapisane w pamięci HDFS zostały skopiowane do GCS w celu trwałego przechowywania.
Aby zatrzymać klaster, kliknij Zatrzymaj.

Gdy klaster się zatrzyma, kliknij Usuń, aby go usunąć.
W oknie potwierdzenia kliknij Usuń, aby usunąć klaster.
