Node.JS ve Google Cloud Functions ile DAG tetikleme

1. Giriş

Apache Airflow, DAG'leri düzenli olarak çalıştıracak şekilde tasarlanmıştır. Ancak bir Cloud Storage paketindeki değişiklik veya Cloud Pub/Sub'a aktarılan bir mesaj gibi etkinliklere yanıt olarak da DAG'leri tetikleyebilirsiniz. Bunun için Cloud Composer DAG'leri Cloud Functions tarafından tetiklenebilir.

Bu laboratuvardaki örnek, bir Cloud Storage paketinde her değişiklik gerçekleştiğinde basit bir DAG çalıştırır. Bu DAG, Cloud Storage paketine yüklenen öğelerle ilgili değişiklik bilgilerini yazdıran bir bash komutu çalıştırmak için BashOperator'ı kullanır.

Bu laboratuvara başlamadan önce Cloud Composer'a Giriş ve Cloud Functions'ı Kullanmaya Başlama codelab'lerini tamamlamanız önerilir. Cloud Composer codelab'e giriş bölümünde bir Composer Ortamı oluşturursanız söz konusu ortamı bu laboratuvarda kullanabilirsiniz.

Ne Oluşturacaksınız?

Bu codelab'de şunları yapacaksınız:

  1. Google Cloud Storage'a bir dosya yükleyerek
  2. Node.JS çalışma zamanını kullanarak Google Cloud Functions işlevi tetikleyin
  3. Bu işlev, Google Cloud Composer'da bir DAG yürütür
  4. Bu işlem, değişikliği Google Cloud Storage paketinde yazdıran basit bir bash komutu çalıştırır

1d3d3736624a923f.png

Neler Öğreneceksiniz?

  • Google Cloud Functions + Node.js kullanarak Apache Airflow DAG tetikleme

İhtiyacınız olanlar

  • GCP Hesabı
  • JavaScript'le ilgili temel bilgiler
  • Cloud Composer/Airflow ve Cloud Functions hakkında temel bilgiler
  • CLI komutlarını kullanma rahatlığı

2. GCP'yi Ayarlama

Projeyi Seçme veya Oluşturma

Bir Google Cloud Platform projesi seçin veya oluşturun. Yeni proje oluşturuyorsanız burada bulunan adımları uygulayın.

Sonraki adımlarda kullanacağınız proje kimliğinizi not edin.

Yeni bir proje oluşturuyorsanız proje kimliği, oluşturma sayfasındaki Proje Adı'nın hemen altında yer alır

Daha önce proje oluşturduysanız kimliği Proje Bilgileri kartındaki konsol ana sayfasında bulabilirsiniz

API'leri etkinleştirme

Cloud Composer, Google Cloud Functions ve Cloud Identity ile Google Identity and Access Management (IAM) API'yi etkinleştirin.

Composer Ortamı Oluşturma

Aşağıdaki yapılandırmaya sahip bir Cloud Composer ortamı oluşturun:

  • Ad: bestecim-ortamım
  • Konum: Size coğrafi olarak en yakın olan konum
  • Alt bölge: Söz konusu bölgedeki herhangi bir alt bölge

Diğer tüm yapılandırmalar varsayılan değerlerinde kalabilir. "Oluştur"u tıklayın. tıklayın. Composer Ortamınızın adını ve konumunu not edin. Sonraki adımlarda bunlara ihtiyacınız olacak.

Cloud Storage Paketi oluşturma

Projenizde aşağıdaki yapılandırmaya sahip bir Cloud Storage paketi oluşturun:

  • Ad: <proje-kimliğiniz>
  • Varsayılan depolama sınıfı: Çok bölgeli
  • Konum: Kullandığınız Cloud Composer bölgesine coğrafi olarak en yakın konum
  • Erişim Kontrol Modeli: Nesne ve paket düzeyinde izinler ayarlama

"Oluştur"a basın. Hazır olduğunuzda sonraki adımlar için Cloud Storage paketinizin adını not ettiğinizden emin olun.

3. Google Cloud Functions'ı (EBOB) ayarlama

GCF'yi ayarlamak için komutları Google Cloud Shell'de çalıştıracağız.

Google Cloud, gcloud komut satırı aracıyla dizüstü bilgisayarınızdan uzaktan çalıştırılabilse de bu codelab'de Cloud'da çalışan bir komut satırı ortamı olan Google Cloud Shell'i kullanacağız.

Bu Debian tabanlı sanal makine, ihtiyacınız olan tüm geliştirme araçlarıyla yüklüdür. 5 GB boyutunda kalıcı bir ana dizin sunar ve Google Cloud üzerinde çalışarak ağ performansını ve kimlik doğrulamasını büyük ölçüde iyileştirir. Yani bu codelab'de ihtiyacınız olan tek şey bir tarayıcıdır (Evet, Chromebook'ta çalışır).

Google Cloud Shell'i etkinleştirmek için geliştirici konsolundan sağ üstteki düğmeyi tıklayın (sağlayıp ortama bağlanmak için birkaç saniye gerekir):

Cloud Functions Hizmet Hesabı'na blob imzalama izinleri verin

GCF'nin Airflow web sunucusunu koruyan proxy olan Cloud IAP'de kimlik doğrulaması yapabilmesi için Appspot Hizmet Hesabı EBOB'sine Service Account Token Creator rolünü vermeniz gerekir. Bu işlemi Cloud Shell'inizde aşağıdaki komutu çalıştırarak <your-project-id> yerine projenizin adını kullanın.

gcloud iam service-accounts add-iam-policy-binding \
<your-project-id>@appspot.gserviceaccount.com \
--member=serviceAccount:<your-project-id>@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

Örneğin, projenizin adı my-project ise komutunuz şöyle olur:

gcloud iam service-accounts add-iam-policy-binding \
my-project@appspot.gserviceaccount.com \
--member=serviceAccount:my-project@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

Client-ID'yi alma

Cloud IAP'de kimlik doğrulaması yapmak amacıyla bir jeton oluşturmak için işlev, Airflow web sunucusunu koruyan proxy'nin istemci kimliğini gerektirir. Cloud Composer API, bu bilgiyi doğrudan sağlamaz. Bunun yerine, Airflow web sunucusuna kimliği doğrulanmamış bir istek gönderin ve yönlendirme URL'sinden istemci kimliğini yakalayın. İstemci kimliğini yakalamak için Cloud Shell'de Python dosyası çalıştırarak bunu yapacağız.

Cloud Shell'inizde aşağıdaki komutu çalıştırarak GitHub'dan gerekli kodu indirin

cd
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git

Bu dizin zaten mevcut olduğu için hata mesajı aldıysanız aşağıdaki komutu çalıştırarak dizini en son sürüme güncelleyin.

cd python-docs-samples/
git pull origin master

Şu komutu çalıştırarak uygun dizine geçin:

cd python-docs-samples/composer/rest

İstemci kimliğinizi almak için python kodunu çalıştırın. Bunun için projenizin adını <your-project-id>, <your-composer-location> için daha önce oluşturduğunuz Composer ortamının konumu ve <your-composer-environment> için daha önce oluşturduğunuz Composer ortamının adını değiştirin

python3 get_client_id.py <your-project-id> <your-composer-location> <your-composer-environment>

Örneğin, projenizin adı my-project, Composer konumunuz us-central1 ve ortamınız my-composer ise komutunuz şöyle olacaktır:

python3 get_client_id.py my-project us-central1 my-composer

get_client_id.py şunları yapar:

  • Google Cloud ile kimlik doğrular
  • Yönlendirme URI'sini almak için Airflow web sunucusuna kimliği doğrulanmamış bir HTTP isteği gönderir
  • Bu yönlendirmeden client_id sorgu parametresini çıkarır
  • Kullanmanız için yazdırır

İstemci kimliğiniz komut satırına yazdırılır ve aşağıdaki gibi görünür:

12345678987654321-abc1def3ghi5jkl7mno8pqr0.apps.googleusercontent.com

4. İşlevinizi Oluşturma

Cloud Shell'inizde şu komutu çalıştırarak depoyu gerekli örnek koduyla klonlayın:

cd
git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git

Gerekli dizine geçin ve sonraki birkaç adımı tamamlarken Cloud Shell'inizi açık bırakın

cd nodejs-docs-samples/composer/functions/composer-storage-trigger

Gezinme menüsünü ve ardından "Cloud Functions"ı tıklayarak Google Cloud Functions sayfasına gidin

"CREATE FUNCTION"ı (FUNCTION OLUŞTUR) tıklayın sayfanın en üstünde

İşlevinizi "my-function" olarak adlandırın ve belleği varsayılan değer olan 256 MB'ta bırakın.

Tetikleyiciyi "Cloud Storage" olarak ayarlayın, Etkinlik türünü "Sonlandır/Oluştur" olarak bırakın ve Cloud Storage Paketi Oluşturma adımında oluşturduğunuz pakete göz atın.

Kaynak Kodunu "Satır İçi Düzenleyici" olarak ayarlanmış şekilde bırakın ve çalışma zamanını "Node.js 8" olarak ayarlayın.

Cloud Shell'inizde aşağıdaki komutu çalıştırın. Bu işlem, index.js ve package.json öğelerini Cloud Shell Düzenleyici'de açar.

cloudshell edit index.js package.json

package.json sekmesini tıklayın, bu kodu kopyalayın ve Cloud Functions satır içi düzenleyicisinin package.json bölümüne yapıştırın.

"Yürütülecek İşlev"i ayarlayın tetiklemek içinDag

index.js sekmesini tıklayın, kodu kopyalayın ve Cloud Functions satır içi düzenleyicisinin index.js bölümüne yapıştırın

PROJECT_ID değerini proje kimliğinizle, CLIENT_ID değerini ise İstemci Kimliğini Edinme adımında kaydettiğiniz istemci kimliği ile değiştirin. "Oluştur"u TIKLAMAYIN Ancak, doldurulması gereken birkaç nokta daha var.

Cloud Shell'inizde <ortamınızın-adı> kısmını değiştirerek aşağıdaki komutu çalıştırın Composer ortamınızın adını ve <composer-region> ile değiştirin Composer Ortamınızın bulunduğu bölgeyle değiştirin.

gcloud composer environments describe <your-environment-name> --location <your-composer-region>

Örneğin, ortamınız my-composer-environment olarak adlandırılıyor ve us-central1 bölgesinde bulunuyorsa komutunuz şöyle olur:

gcloud composer environments describe my-composer-environment --location us-central1

Çıkış şu şekilde görünmelidir:

config:
 airflowUri: https://abc123efghi456k-tp.appspot.com
 dagGcsPrefix: gs://narnia-north1-test-codelab-jklmno-bucket/dags
 gkeCluster: projects/a-project/zones/narnia-north1-b/clusters/narnia-north1-test-codelab-jklmno-gke
 nodeConfig:
   diskSizeGb: 100
   location: projects/a-project/zones/narnia-north1-b
   machineType: projects/a-project/zones/narnia-north1-b/machineTypes/n1-standard-1
   network: projects/a-project/global/networks/default
   oauthScopes:
   - https://www.googleapis.com/auth/cloud-platform
   serviceAccount: 987665432-compute@developer.gserviceaccount.com
 nodeCount: 3
 softwareConfig:
   imageVersion: composer-1.7.0-airflow-1.10.0
   pythonVersion: '2'
createTime: '2019-05-29T09:41:27.919Z'
name: projects/a-project/locations/narnia-north1/environments/my-composer-environment
state: RUNNING
updateTime: '2019-05-29T09:56:29.969Z'
uuid: 123456-7890-9876-543-210123456

Bu çıkışta airflowUri adlı değişkeni bulun. index.js kodunuzda WEBSERVER_ID değerini Airflow web sunucusu kimliği olarak değiştirin. Bu kimlik, airflowUri değişkeninin "-tp" harfine sahip olan parçasıdır. sonda, örneğin, abc123efghi456k-tp

"Diğer"i tıklayın açılır bağlantıyı tıklayın, ardından size coğrafi olarak en yakın Bölgeyi seçin

"Başarısız Olduğunda Yeniden Dene"yi kontrol edin

"Oluştur"u tıklayın. Cloud Functions işlevinizi oluşturun

Kodun Ayrıntılarını Anlama

index.js'den kopyaladığınız kod aşağıdaki gibi görünür:

// [START composer_trigger]
'use strict';

const fetch = require('node-fetch');
const FormData = require('form-data');

/**
 * Triggered from a message on a Cloud Storage bucket.
 *
 * IAP authorization based on:
 * https://stackoverflow.com/questions/45787676/how-to-authenticate-google-cloud-functions-for-access-to-secure-app-engine-endpo
 * and
 * https://cloud.google.com/iap/docs/authentication-howto
 *
 * @param {!Object} data The Cloud Functions event data.
 * @returns {Promise}
 */
exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

/**
 * @param {string} clientId The client id associated with the Composer webserver application.
 * @param {string} projectId The id for the project containing the Cloud Function.
 * @param {string} userAgent The user agent string which will be provided with the webserver request.
 */
const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

/**
 * @param {string} url The url that the post request targets.
 * @param {string} body The body of the post request.
 * @param {string} idToken Bearer token used to authorize the iap request.
 * @param {string} userAgent The user agent to identify the requester.
 */
const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};
// [END composer_trigger]

Neler olduğuna bakalım. Burada üç işlev vardır: triggerDag, authorizeIap ve makeIapPostRequest

triggerDag, belirtilen Cloud Storage paketine bir öğe yüklediğimizde tetiklenen işlevdir. Diğer isteklerde kullanılan PROJECT_ID, CLIENT_ID, WEBSERVER_ID ve DAG_NAME gibi önemli değişkenleri burada yapılandırırız. authorizeIap ve makeIapPostRequest çağrıları yapıyor.

exports.triggerDag = async data => {
  // Fill in your Composer environment information here.

  // The project that holds your function
  const PROJECT_ID = 'your-project-id';
  // Navigate to your webserver's login page and get this from the URL
  const CLIENT_ID = 'your-iap-client-id';
  // This should be part of your webserver's URL:
  // {tenant-project-id}.appspot.com
  const WEBSERVER_ID = 'your-tenant-project-id';
  // The name of the DAG you wish to trigger
  const DAG_NAME = 'composer_sample_trigger_response_dag';

  // Other constants
  const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
  const USER_AGENT = 'gcf-event-trigger';
  const BODY = {conf: JSON.stringify(data)};

  // Make the request
  try {
    const iap = await authorizeIap(CLIENT_ID, PROJECT_ID, USER_AGENT);

    return makeIapPostRequest(
      WEBSERVER_URL,
      BODY,
      iap.idToken,
      USER_AGENT,
      iap.jwt
    );
  } catch (err) {
    throw new Error(err);
  }
};

authorizeIap, bir hizmet hesabı kullanarak ve "değişim" yaparak Airflow web sunucusunu koruyan proxy'ye istek gönderir. makeIapPostRequest kimliğini doğrulamakta kullanılacak kimlik jetonu için bir JWT.

const authorizeIap = async (clientId, projectId, userAgent) => {
  const SERVICE_ACCOUNT = `${projectId}@appspot.gserviceaccount.com`;
  const JWT_HEADER = Buffer.from(
    JSON.stringify({alg: 'RS256', typ: 'JWT'})
  ).toString('base64');

  let jwt = '';
  let jwtClaimset = '';

  // Obtain an Oauth2 access token for the appspot service account
  const res = await fetch(
    `http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/${SERVICE_ACCOUNT}/token`,
    {
      headers: {'User-Agent': userAgent, 'Metadata-Flavor': 'Google'},
    }
  );
  const tokenResponse = await res.json();
  if (tokenResponse.error) {
    return Promise.reject(tokenResponse.error);
  }

  const accessToken = tokenResponse.access_token;
  const iat = Math.floor(new Date().getTime() / 1000);
  const claims = {
    iss: SERVICE_ACCOUNT,
    aud: 'https://www.googleapis.com/oauth2/v4/token',
    iat: iat,
    exp: iat + 60,
    target_audience: clientId,
  };
  jwtClaimset = Buffer.from(JSON.stringify(claims)).toString('base64');
  const toSign = [JWT_HEADER, jwtClaimset].join('.');

  const blob = await fetch(
    `https://iam.googleapis.com/v1/projects/${projectId}/serviceAccounts/${SERVICE_ACCOUNT}:signBlob`,
    {
      method: 'POST',
      body: JSON.stringify({
        bytesToSign: Buffer.from(toSign).toString('base64'),
      }),
      headers: {
        'User-Agent': userAgent,
        Authorization: `Bearer ${accessToken}`,
      },
    }
  );
  const blobJson = await blob.json();
  if (blobJson.error) {
    return Promise.reject(blobJson.error);
  }

  // Request service account signature on header and claimset
  const jwtSignature = blobJson.signature;
  jwt = [JWT_HEADER, jwtClaimset, jwtSignature].join('.');
  const form = new FormData();
  form.append('grant_type', 'urn:ietf:params:oauth:grant-type:jwt-bearer');
  form.append('assertion', jwt);

  const token = await fetch('https://www.googleapis.com/oauth2/v4/token', {
    method: 'POST',
    body: form,
  });
  const tokenJson = await token.json();
  if (tokenJson.error) {
    return Promise.reject(tokenJson.error);
  }

  return {
    jwt: jwt,
    idToken: tokenJson.id_token,
  };
};

makeIapPostRequest, composer_sample_trigger_response_dag. öğesini tetiklemek için Airflow web sunucusuna bir çağrı yapar. DAG adı, url parametresiyle iletilen Airflow web sunucusu URL'sine yerleştirilir ve idToken, authorizeIap isteğinde elde ettiğimiz jetondur.

const makeIapPostRequest = async (url, body, idToken, userAgent) => {
  const res = await fetch(url, {
    method: 'POST',
    headers: {
      'User-Agent': userAgent,
      Authorization: `Bearer ${idToken}`,
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    const err = await res.text();
    throw new Error(err);
  }
};

5. DAG'nizi Ayarlama

Cloud Shell'inizde örnek iş akışlarının yer aldığı dizine geçin. Bu dosya, İstemci Kimliğini Edinme adımında GitHub'dan indirdiğiniz python-docs-samples örneklerinin bir parçasıdır.

cd
cd python-docs-samples/composer/workflows

DAG'yi Composer'a yükleme

Örnek DAG'yi Composer ortamınızın DAG depolama paketine aşağıdaki komutla yükleyin. Burada <environment_name>, Composer ortamınızın, <location> ise bulunduğu bölgenin adıdır. trigger_response_dag.py, üzerinde çalışacağımız DAG'dir.

gcloud composer environments storage dags import \
    --environment <environment_name> \
    --location <location> \
    --source trigger_response_dag.py

Örneğin, Composer ortamınız my-composer olarak adlandırıldıysa ve us-central1 konumunda bulunuyorsa komutunuz şöyle olur:

gcloud composer environments storage dags import \
    --environment my-composer \
    --location us-central1 \
    --source trigger_response_dag.py

DAG'de Adım Atma

trigger_response.py içindeki DAG kodu aşağıdaki gibi görünür

import datetime
import airflow
from airflow.operators import bash_operator


default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

default_args bölümü, Apache Airflow'daki BaseOperator modelinin gerektirdiği varsayılan bağımsız değişkenleri içerir. Bu bölümü, herhangi bir Apache Airflow DAG'de bu parametrelerle görürsünüz. owner şu anda Composer Example olarak ayarlı, ancak isterseniz bunu adınız olarak değiştirebilirsiniz. depends_on_past, bu DAG'nin daha önceki herhangi bir DAG'ye bağlı olmadığını gösteriyor. Üç e-posta bölümü (email, email_on_failure ve email_on_retry) bu DAG'nin durumuna bağlı olarak e-posta bildirimi almayacak şekilde ayarlanmıştır. retries, 1 olarak ayarlandığından DAG yalnızca bir kez yeniden dener ve bu işlemi her retry_delay için beş dakika sonra yapar. start_date, normalde bir DAG'nin schedule_interval (daha sonra ayarlanır) ile birlikte ne zaman çalışması gerektiğini belirtir ancak bu DAG söz konusu olduğunda alakalı değildir. 1 Ocak 2017 olarak ayarlandı ancak geçmiş herhangi bir tarihe ayarlanabilir.

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017, 1, 1),
}

with airflow.DAG bölümü, çalıştırılacak DAG'yi yapılandırır. Görev kimliği composer_sample_trigger_response_dag, default_args bölümündeki varsayılan bağımsız değişkenler ve en önemlisi schedule_interval değeri None ile çalışacak. Bu DAG'yi Cloud Functions işlevimizle tetiklediğimiz için schedule_interval, None olarak ayarlanmıştır. Bu nedenle default_args içindeki start_date alakalı değildir.

DAG, çalıştırıldığında yapılandırmasını print_gcs_info değişkeninde belirtildiği şekilde yazdırır.

with airflow.DAG(
        'composer_sample_trigger_response_dag',
        default_args=default_args,
        # Not scheduled, trigger only
        schedule_interval=None) as dag:

    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = bash_operator.BashOperator(
        task_id='print_gcs_info', bash_command='echo {{ dag_run.conf }}')

6. İşlevinizi Test Etme

Composer Ortamınızı açın ve ortamınızın adının bulunduğu satırda Airflow bağlantısını tıklayın.

Adını tıklayarak composer_sample_trigger_response_dag öğesini açın. Henüz çalıştırılmak üzere DAG'yi tetiklemediğimiz için şu anda DAG çalıştırıldığına dair herhangi bir kanıt bulunmamaktadır.Bu DAG görünür veya tıklanabilir değilse bir dakika bekleyip sayfayı yenileyin.

Ayrı bir sekme açın ve daha önce oluşturduğunuz ve Cloud Functions işleviniz için tetikleyici olarak belirttiğiniz Cloud Storage paketine herhangi bir dosyayı yükleyin. Bu işlemi Konsol veya gsutil komutu kullanarak yapabilirsiniz.

Airflow kullanıcı arayüzünüzü içeren sekmeye geri dönün ve Grafik Görünümü'nü tıklayın

Yeşil renkle özetlenmesi gereken print_gcs_info görevini tıklayın

"Günlüğü Görüntüle"yi tıklayın menünün sağ üst köşesinde

Günlüklerde, Cloud Storage paketinize yüklediğiniz dosyayla ilgili bilgiler gösterilir.

Tebrikler! Node.js ve Google Cloud Functions'ı kullanarak bir Airflow DAG'yi tetiklediniz.

7. Temizleme

Bu hızlı başlangıç kılavuzunda kullanılan kaynaklar için GCP hesabınızın ücretlendirilmesini önlemek amacıyla:

  1. (İsteğe bağlı) Verilerinizi kaydetmek istiyorsanız Cloud Composer ortamına ait Cloud Storage paketinden ve bu hızlı başlangıç kılavuzu için oluşturduğunuz depolama paketinden verileri indirin.
  2. Ortam için oluşturduğunuz ve oluşturduğunuz Cloud Storage paketini silin.
  3. Cloud Composer ortamını silin. Ortamınızı sildiğinizde ortama ait depolama paketinin silinmeyeceğini unutmayın.
  4. (İsteğe bağlı) Sunucusuz bilgi işlem ile ayda ilk 2 milyon çağrı ücretsizdir ve işlevinizi sıfıra ölçeklendirdiğinizde sizden ücret alınmaz (daha fazla bilgi için fiyatlandırmaya bakın). Ancak Cloud Functions işlevinizi silmek istiyorsanız "DELETE" (SİL) düğmesini tıklayarak bunu yapabilirsiniz. işlevinize ait genel bakış sayfasının sağ üst tarafındaki

4fe11e1b41b32ba2.png

İsterseniz projeyi silebilirsiniz:

  1. GCP Console'da Projeler sayfasına gidin.
  2. Proje listesinde, silmek istediğiniz projeyi seçin ve Sil'i tıklayın.
  3. Kutuya proje kimliğini yazın ve ardından projeyi silmek için Kapat'ı tıklayın.