Uruchamianie zadania liczby słów Hadoop w klastrze Dataproc

1. Wprowadzenie

Procesy to typowy przypadek użycia w analityce danych. Polegają one na pozyskiwaniu, przekształcaniu i analizowaniu danych w celu znajdowania w nich istotnych informacji. Narzędziem do administrowania przepływami pracy w Google Cloud Platform jest Cloud Composer – hostowana wersja popularnego narzędzia open source do zarządzania przepływami pracy Apache Airflow. W tym module użyjesz Cloud Composer do utworzenia prostego przepływu pracy, który tworzy klaster Cloud Dataproc, analizuje go za pomocą Cloud Dataproc i Apache Hadoop, a następnie usuwa klaster Cloud Dataproc.

Co to jest Cloud Composer?

Cloud Composer to w pełni zarządzana usługa administracji przepływów pracy, która umożliwia tworzenie, planowanie i monitorowanie potoków obejmujących chmury i centra danych na potrzeby lokalnych systemów. Platforma Cloud Composer została opracowana na podstawie popularnego projektu open source Apache Airflow i działa w języku programowania Python. Jest łatwa w użyciu i nie wiąże Cię z żadną technologią.

Dzięki korzystaniu z Cloud Composer zamiast lokalnej instancji Apache Airflow użytkownicy mogą korzystać z najlepszych funkcji Airflow bez konieczności instalowania tej usługi ani zarządzania nią.

Co to jest Apache Airflow?

Apache Airflow to narzędzie typu open source służące do programowego tworzenia, planowania i monitorowania przepływów pracy. W trakcie modułu warto pamiętać o kilku kluczowych terminach związanych z Airflow:

  • DAG – DAG (kierowany acykliczny wykres) to zbiór uporządkowanych zadań, które chcesz zaplanować i uruchomić. DAG-i, nazywane też przepływami pracy, są zdefiniowane w standardowych plikach w Pythonie.
  • Operator – operator opisujący pojedyncze zadanie w przepływie pracy.

Co to jest Cloud Dataproc?

Cloud Dataproc to w pełni zarządzana usługa Apache Spark i Apache Hadoop w Google Cloud Platform. Cloud Dataproc łatwo integruje się z innymi usługami GCP, co zapewnia wydajną i kompleksową platformę do przetwarzania danych, analizy i uczenia maszynowego.

Czynności wykonane

Z tego ćwiczenia w Codelabs dowiesz się, jak utworzyć i uruchomić przepływ pracy Apache Airflow w Cloud Composer, który wykonuje te zadania:

  • tworzy klaster Cloud Dataproc,
  • Uruchamia w klastrze zadanie liczby słów Apache Hadoop i przesyła jego wyniki do Cloud Storage
  • Usuwa klaster

Czego się nauczysz

  • Jak utworzyć i uruchomić przepływ pracy Apache Airflow w Cloud Composer
  • Jak użyć Cloud Composer i Cloud Dataproc do przeprowadzenia analizy zbioru danych
  • Jak uzyskać dostęp do środowiska Cloud Composer za pomocą konsoli Google Cloud Platform, pakietu Cloud SDK i interfejsu internetowego Airflow

Czego potrzebujesz

  • Konto GCP
  • podstawowa znajomość wiersza poleceń,
  • Podstawowa znajomość języka Python

2. Konfigurowanie GCP

Tworzenie projektu

Wybierz lub utwórz projekt Google Cloud Platform.

Zapisz identyfikator projektu. Użyjesz go w kolejnych krokach.

Jeśli tworzysz nowy projekt, identyfikator projektu znajdziesz na stronie tworzenia poniżej nazwy projektu.

Jeśli masz już utworzony projekt, identyfikator znajdziesz na stronie głównej konsoli na karcie Informacje o projekcie.

Włączanie interfejsów API

Włącz interfejsy Cloud Composer, Cloud Dataproc i Cloud Storage API.Po ich włączeniu możesz zignorować przycisk „Go to Credentials” (Otwórz dane logowania) i przejść do następnego kroku samouczka.

Tworzenie środowiska Composer

Utwórz środowisko Cloud Composer o tej konfiguracji:

  • Nazwa: my-composer-environment
  • Lokalizacja: us-central1
  • Strefa: us-central1-a

Wszystkie inne konfiguracje mogą pozostać domyślne. U dołu kliknij „Utwórz”.

Tworzenie zasobnika Cloud Storage

Utwórz w projekcie zasobnik Cloud Storage o tej konfiguracji:

  • Nazwa: <identyfikator-twojego-projektu>
  • Domyślna klasa pamięci masowej: wiele regionów
  • Lokalizacja: Stany Zjednoczone
  • Model kontroli dostępu: szczegółowa

Gdy wszystko będzie gotowe, kliknij „Utwórz”

3. Konfigurowanie Apache Airflow

Wyświetlanie informacji o środowisku Composer

W konsoli Google Cloud Platform otwórz stronę Środowiska.

Kliknij nazwę środowiska, aby wyświetlić jego szczegóły.

Na stronie Szczegóły środowiska znajdziesz informacje takie jak URL interfejsu internetowego Airflow, identyfikator klastra Google Kubernetes Engine, nazwa zasobnika Cloud Storage i ścieżka do folderu /dags.

W Airflow DAG (skierowany graf acykliczny) to zbiór uporządkowanych zadań, które chcesz zaplanować i wykonać. DAG-i, nazywane też przepływami pracy, są zdefiniowane w standardowych plikach Pythona. Cloud Composer harmonogramuje tylko DAG-i w folderze /dags. Folder /dags znajduje się w zasobniku Cloud Storage, który Cloud Composer tworzy automatycznie podczas tworzenia środowiska.

Ustawianie zmiennych środowiskowych Apache Airflow

Zmienne Apache Airflow to pojęcie specyficzne dla Airflow, które różni się od zmiennych środowiskowych. W tym kroku skonfigurujesz 3 zmienną Airflow: gcp_project, gcs_bucketgce_zone.

Używanie gcloud do ustawiania zmiennych

Najpierw otwórz Cloud Shell, w którym jest już zainstalowany pakiet Cloud SDK.

Ustaw zmienną środowiskową COMPOSER_INSTANCE na nazwę środowiska Composer.

COMPOSER_INSTANCE=my-composer-environment

Aby ustawić zmienne Airflow za pomocą narzędzia wiersza poleceń gcloud, użyj polecenia gcloud composer environments run z podrzędnym poleceniem variables. To polecenie gcloud composer wykonuje podrzędne polecenie interfejsu wiersza poleceń Airflow variables. Podpolecenie przekazuje argumenty do narzędzia wiersza poleceń gcloud.

Uruchom to polecenie 3 razy, zastępując zmienne odpowiednimi wartościami dla Twojego projektu.

Ustaw wartość gcp_project za pomocą poniższego polecenia, zastępując fragment <your-project-id> identyfikatorem projektu zapisanym w Kroku 2.

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gcp_project <your-project-id>

Dane wyjściowe będą wyglądać mniej więcej tak

kubeconfig entry generated for us-central1-my-composer-env-123abc-gke.
Executing within the following Kubernetes cluster namespace: composer-1-10-0-airflow-1-10-2-123abc
[2020-04-17 20:42:49,713] {settings.py:176} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=449
[2020-04-17 20:42:50,123] {default_celery.py:90} WARNING - You have configured a result_backend of redis://airflow-redis-service.default.svc.cluste
r.local:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database).
[2020-04-17 20:42:50,127] {__init__.py:51} INFO - Using executor CeleryExecutor
[2020-04-17 20:42:50,433] {app.py:52} WARNING - Using default Composer Environment Variables. Overrides have not been applied.
[2020-04-17 20:42:50,440] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg
[2020-04-17 20:42:50,452] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg

Ustaw gcs_bucket za pomocą tego polecenia, zastępując <your-bucket-name> identyfikatorem zasobnika zanotowanymi w kroku 2. Jeśli jest on zgodny z naszym zaleceniem, nazwa zasobnika jest taka sama jak identyfikator projektu. Dane wyjściowe będą podobne do tych z poprzedniego polecenia.

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gcs_bucket gs://<your-bucket-name>

Ustaw gce_zone za pomocą tego polecenia. Dane wyjściowe będą podobne do poprzednich poleceń.

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gce_zone us-central1-a

(Opcjonalnie) Używanie funkcji gcloud do wyświetlania zmiennej

Aby zobaczyć wartość zmiennej, uruchom podrzędne polecenie interfejsu wiersza poleceń Airflow variables z argumentem get lub użyj interfejsu Airflow.

Na przykład:

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --get gcs_bucket

Możesz to zrobić za pomocą dowolnej z 3 zmiennych, które właśnie ustawiłeś: gcp_project, gcs_bucketgce_zone.

4. Przykładowy przepływ pracy

Przyjrzyjmy się kodom DAG, których użyjemy w kroku 5. Nie martw się jeszcze pobieraniem plików, tylko postępuj zgodnie z instrukcjami.

Jest tu sporo rzeczy do rozpakowania, więc omówimy nieco ten temat.

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

Zaczniemy od importowania danych z Airflow:

  • airflow.models – umożliwia nam dostęp do danych w bazie danych Airflow i tworzenie w niej danych.
  • airflow.contrib.operators – gdzie mieszkają operatorzy ze społeczności. W tym przypadku potrzebujemy dataproc_operator, aby uzyskać dostęp do interfejsu Cloud Dataproc API.
  • airflow.utils.trigger_rule – służy do dodawania reguł reguł do naszych operatorów. Reguły reguł umożliwiają szczegółowe kontrolowanie tego, czy operator powinien się wykonać w zależności od stanu jego elementów nadrzędnych.
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep

Określa lokalizację pliku wyjściowego. Warto zauważyć tutaj wiersz models.Variable.get('gcs_bucket'), który pobierze wartość zmiennej gcs_bucket z bazy danych Airflow.

WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)

input_file = 'gs://pub/shakespeare/rose.txt'

wordcount_args = ['wordcount', input_file, output_file]
  • WORDCOUNT_JAR – lokalizacja pliku .jar, który ostatecznie uruchomimy w klastrze Cloud Dataproc. Jest już dla Ciebie hostowany w GCP.
  • input_file – lokalizacja pliku zawierającego dane, które zadanie Hadoop będzie w przyszłości obliczać. W kroku 5 razem prześlemy dane do tej lokalizacji.
  • wordcount_args – argumenty, które zostaną przekazane do pliku jar.
yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

Otrzymasz odpowiednik obiektu daty i godziny reprezentującego północ poprzedniego dnia. Jeśli na przykład zostanie to wykonane 4 marca o godzinie 11:00, obiekt datetime będzie reprezentować godzinę 00:00 3 marca. Ma to związek ze sposobem, w jaki Airflow obsługuje planowanie. Więcej informacji na ten temat znajdziesz tutaj.

default_dag_args = {
    'start_date': yesterday,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

Zmienna default_dag_args w postaci słownika należy podawać za każdym razem, gdy tworzony jest nowy DAG:

  • 'email_on_failure' – wskazuje, czy mają być wysyłane alerty e-mail w przypadku niepowodzenia zadania.
  • 'email_on_retry' – wskazuje, czy mają być wysyłane alerty e-mail po ponownym wykonaniu zadania.
  • 'retries' – wskazuje, ile ponownych prób ma wykonać Airflow w przypadku awarii DAG-a.
  • 'retry_delay' – wskazuje, jak długo Airflow ma czekać przed ponowną próbą.
  • 'project_id' – informuje DAG-a, z którym identyfikatorem projektu GCP ma być powiązany, który będzie potrzebny później operatorowi Dataproc.
with models.DAG(
        'composer_hadoop_tutorial',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

Użycie dyrektywy with models.DAG powoduje, że skrypt ma umieścić wszystkie elementy znajdujące się poniżej w tym samym DAG-u. Widzimy również 3 przekazywane argumenty:

  • Pierwszy z nich to ciąg znaków. To nazwa, która nadaje DAG-owi, który tworzymy. W tym przypadku używamy composer_hadoop_tutorial.
  • schedule_interval – obiekt datetime.timedelta, który w tym miejscu został ustawiony na 1 dzień. Oznacza to, że ten DAG będzie się starał wykonywać raz dziennie po 'start_date' ustawionym wcześniej w 'default_dag_args'.
  • default_args – wcześniej utworzony słownik zawierający domyślne argumenty dla DAG.

Tworzenie klastra Dataproc

Następnie utworzymy dataproc_operator.DataprocClusterCreateOperator, który utworzy klaster Cloud Dataproc.

    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

W tym operatorze widzimy kilka argumentów, z których wszystkie oprócz pierwszego są specyficzne dla tego operatora:

  • task_id – tak jak w przypadku BashOperator, jest to nazwa przypisana do operatora i widoczna w interfejsie Airflow.
  • cluster_name – nazwa przypisana klastrowi Cloud Dataproc. Nazwaliśmy ją composer-hadoop-tutorial-cluster-{{ ds_nodash }} (dodatkowe informacje znajdziesz w polu informacyjnym)
  • num_workers – liczba instancji roboczych przypisanych do klastra Cloud Dataproc
  • zone – region geograficzny, w którym ma działać klaster, zgodnie z danymi zapisanymi w bazie danych Airflow. Spowoduje to odczytanie zmiennej 'gce_zone' ustawionej w kroku 3
  • master_machine_type – typ maszyny, który chcemy przypisać do głównego serwera Cloud Dataproc
  • worker_machine_type – typ maszyny, którą chcesz przydzielić do instancji roboczej Cloud Dataproc

Przesyłanie zadania Apache Hadoop

dataproc_operator.DataProcHadoopOperator umożliwia nam przesłanie zadania do klastra Cloud Dataproc.

    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

Udostępniamy kilka parametrów:

  • task_id – nazwa przypisana do tego elementu DAG;
  • main_jar – lokalizacja pliku .jar, który ma być uruchamiany w klastrze.
  • cluster_name – nazwa klastra, na którym ma być wykonywane zadanie, identyczna z nazwą w poprzednim operatorze.
  • arguments – argumenty, które są przekazywane do pliku jar w taki sam sposób jak w przypadku wykonywania pliku jar z poziomu wiersza poleceń.

Usuwanie klastra

Ostatni operator, który utworzymy, to dataproc_operator.DataprocClusterDeleteOperator.

    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

Jak sugeruje nazwa, ten operator usuwa dany klaster Cloud Dataproc. Występują tu 3 argumenty:

  • task_id – podobnie jak w przypadku BashOperator, jest to nazwa przypisana do operatora, która jest widoczna w interfejsie Airflow.
  • cluster_name – nazwa, którą przypisujemy klastrowi Cloud Dataproc. Teraz nazywamy go composer-hadoop-tutorial-cluster-{{ ds_nodash }} (opcjonalne dodatkowe informacje znajdziesz w polu informacji po „Utwórz klaster Dataproc”).
  • trigger_rule – na początku tego kroku omówiliśmy krótko reguły aktywatorów, ale oto chodzi o działanie. Domyślnie operator Airflow nie jest wykonywany, dopóki nie zostaną wykonane wszystkie jego operatory nadrzędne. Reguła reguły aktywacji ALL_DONE wymaga tylko, aby wszyscy operatorzy w górę łańcucha wykonali swoje działanie, niezależnie od tego, czy zakończyło się ono powodzeniem. W tym przypadku oznacza to, że nawet jeśli zadanie Hadoop się nie udało, i tak chcemy zniszczyć klaster.
  create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

Na koniec chcemy, aby operatory te były wykonywane w określonej kolejności, i możemy to wskazać za pomocą operatorów przesunięcia bitowego w języku Python. W tym przypadku najpierw zawsze będzie wykonywane wyrażenie create_dataproc_cluster, potem run_dataproc_hadoop, a na końcu delete_dataproc_cluster.

Po połączeniu wszystkiego kod wygląda tak:

# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# [START composer_hadoop_tutorial]
"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
# Arguments to pass to Cloud Dataproc job.
input_file = 'gs://pub/shakespeare/rose.txt'

wordcount_args = ['wordcount', input_file, output_file]

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

# [START composer_hadoop_schedule]
with models.DAG(
        'composer_hadoop_tutorial',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:
    # [END composer_hadoop_schedule]

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/code.html#default-variables
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

    # [START composer_hadoop_steps]
    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
    # [END composer_hadoop_steps]

# [END composer_hadoop]

5. Przesyłanie plików Airflow do Cloud Storage

Skopiuj DAG-a do folderu /dags

  1. Najpierw otwórz Cloud Shell, do której masz wygodny zainstalowany pakiet SDK Cloud.
  2. Sklonuj repozytorium z przykładami kodu Python i przejdź do katalogu kompozytor/przepływy pracy
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git && cd python-docs-samples/composer/workflows
  1. Uruchom to polecenie, aby ustawić nazwę folderu DAG-ów na zmienną środowiskową
DAGS_FOLDER=$(gcloud composer environments describe ${COMPOSER_INSTANCE} \
--location us-central1 --format="value(config.dagGcsPrefix)")
  1. Uruchom to polecenie gsutil, aby skopiować kod samouczka do miejsca tworzenia folderu /dags
gsutil cp hadoop_tutorial.py $DAGS_FOLDER

Dane wyjściowe będą wyglądać mniej więcej tak:

Copying file://hadoop_tutorial.py [Content-Type=text/x-python]...
/ [1 files][  4.1 KiB/  4.1 KiB]
Operation completed over 1 objects/4.1 KiB.

6. Za pomocą interfejsu Airflow

Aby uzyskać dostęp do interfejsu internetowego Airflow za pomocą konsoli GCP:

  1. Otwórz stronę Środowiska.
  2. W kolumnie Serwer WWW Airflow dla środowiska kliknij ikonę nowego okna. Internetowy interfejs użytkownika Airflow otworzy się w nowym oknie przeglądarki.

Informacje o interfejsie Airflow znajdziesz w artykule Uzyskiwanie dostępu do interfejsu internetowego.

Wyświetl zmienne

Zmienne ustawione wcześniej są przechowywane w środowisku. Aby wyświetlić zmienne, wybierz Administracja > Zmienne na pasku menu interfejsu Airflow.

Karta Lista jest zaznaczona i pokazuje tabelę z tymi kluczami i wartościami: klucz: gcp_project, wartość: identyfikator projektu; klucz: gcs_bucket, wartość: gs://nazwa_bucketa; klucz: gce_zone, wartość: strefa.

Poznawanie uruchomień DAG

Gdy prześlesz plik DAG do folderu dags w Cloud Storage, Cloud Composer przeanalizuje go. Jeśli nie zostaną znalezione żadne błędy, nazwa przepływu pracy pojawi się na liście DAG-a, a przepływ pracy zostanie umieszczony w kolejce do natychmiastowego uruchomienia. Aby wyświetlić DAG-i, kliknij DAG-i u góry strony.

84a29c71f20bff98.png

Kliknij composer_hadoop_tutorial, aby otworzyć stronę z informacjami o DAG. Ta strona zawiera graficzne przedstawienie zadań i zależności przepływu pracy.

f4f1663c7a37f47c.png

Teraz na pasku narzędzi kliknij Widok wykresu, a następnie najedź kursorem na grafikę każdego zadania, aby wyświetlić jego stan. Pamiętaj, że obramowanie wokół każdego zadania wskazuje też jego stan (zielone obramowanie = uruchomione, czerwone = niepowodzenie itd.).

4c5a0c6fa9f88513.png

Aby ponownie uruchomić przepływ pracy z poziomu widoku wykresu:

  1. W widoku wykresu interfejsu Airflow kliknij ikonę create_dataproc_cluster.
  2. Kliknij Wyczyść, aby zresetować te 3 zadania, a następnie kliknij OK, aby potwierdzić tę czynność.

fd1b23b462748f47.png

Stan i wyniki przepływu pracy w composer-hadoop-tutorial możesz też sprawdzić na tych stronach konsoli GCP:

  • Klastry Cloud Dataproc do monitorowania tworzenia i usuwania klastrów. Pamiętaj, że klaster utworzony przez przepływ pracy jest tymczasowy: istnieje tylko przez czas trwania przepływu pracy i jest usuwany w ramach ostatniego zadania przepływu pracy.
  • Cloud Dataproc Jobs, aby wyświetlić lub monitorować zadanie zliczania słów w Apache Hadoop. Kliknij identyfikator zadania, aby wyświetlić dane wyjściowe logu zadania.
  • Przeglądarka Cloud Storage, aby wyświetlić wyniki liczby słów w folderze wordcount w zasobniku Cloud Storage utworzonym w ramach tego ćwiczenia.

7. Czyszczenie

Oto czynności, które musisz wykonać, aby uniknąć obciążenia konta Google Cloud Platform opłatami za zasoby zużyte w tym Codelab:

  1. (Opcjonalnie) Aby zapisać dane, pobierz je z zasobnika Cloud Storage środowiska Cloud Composer i zasobnika utworzonego w tym laboratorium kodu.
  2. Usuń zasobnik Cloud Storage utworzony na potrzeby tego samouczka.
  3. Usuń zasobnik Cloud Storage dla środowiska.
  4. Usuń środowisko Cloud Composer. Pamiętaj, że usunięcie środowiska nie powoduje usunięcia jego zasobnika na dane.

Możesz też opcjonalnie usunąć projekt:

  1. W konsoli GCP otwórz stronę Projekty.
  2. Na liście projektów wybierz projekt do usunięcia, a następnie kliknij Usuń.
  3. W polu wpisz identyfikator projektu i kliknij Wyłącz, aby usunąć projekt.