Программное обеспечение качества данных с Dataplex и генеративным ИИ

1. Введение

Этот практический семинар предоставляет техническую схему для специалистов по работе с данными. Он описывает подход к управлению данными, основанный на принципе «сначала код», демонстрируя, как внедрить надежное управление качеством и метаданными непосредственно в цикл разработки. По своей сути, Dataplex Universal Catalog действует как интеллектуальная инфраструктура данных, позволяющая организациям централизованно управлять, отслеживать и контролировать данные по всей своей инфраструктуре — от озер данных до хранилищ данных.

В этом практическом занятии демонстрируется, как использовать Dataplex, BigQuery и Gemini CLI для упрощения сложных данных, программного профилирования, генерации интеллектуальных предложений по правилам качества данных и развертывания автоматизированных проверок качества. Основная цель — отказаться от ручных процессов, управляемых пользовательским интерфейсом (которые подвержены ошибкам и трудно масштабируются), и вместо этого создать надежную, управляемую версиями структуру «политики как код».

Предварительные требования

  • Базовое понимание консоли Google Cloud
  • Базовые навыки работы с командной строкой и Google Cloud Shell.

Что вы узнаете

  • Как преобразовать вложенные данные BigQuery в плоскую структуру с помощью материализованных представлений для обеспечения комплексного профилирования.
  • Как программно запускать и управлять сканированием профилей Dataplex с помощью клиентской библиотеки Dataplex для Python.
  • Как экспортировать данные профиля и структурировать их в качестве входных данных для модели генеративного искусственного интеллекта.
  • Как настроить запрос для Gemini CLI, чтобы он анализировал данные профиля и генерировал файл правил YAML, совместимый с Dataplex.
  • Важность интерактивного процесса с участием человека (HITL) для проверки конфигураций, сгенерированных искусственным интеллектом.
  • Как развернуть сгенерированные правила в качестве автоматизированной проверки качества данных.

Что вам понадобится

  • Аккаунт Google Cloud и проект Google Cloud
  • Веб-браузер, например Chrome.

Ключевые понятия: основы качества данных Dataplex.

Понимание основных компонентов Dataplex имеет решающее значение для построения эффективной стратегии обеспечения качества данных.

  • Сканирование профиля данных: задача Dataplex, которая анализирует данные и генерирует статистические метаданные, включая процент нулевых значений, количество уникальных значений и распределение значений. Это служит нашей программной фазой «обнаружения».
  • Правила качества данных: Декларативные утверждения, определяющие условия, которым должны соответствовать ваши данные (например, NonNullExpectation , SetExpectation , RangeExpectation ).
  • Генеративный ИИ для предложения правил: использование большой языковой модели (например, Gemini) для анализа профиля данных и предложения соответствующих правил качества данных. Это ускоряет процесс определения базовой структуры качества.
  • Проверка качества данных: задача Dataplex, которая проверяет данные на соответствие набору предопределенных или пользовательских правил.
  • Программное управление: центральная тема управления средствами контроля (такими как правила качества) в виде кода (например, в файлах YAML и скриптах Python). Это позволяет автоматизировать процессы, управлять версиями и интегрировать систему в конвейеры CI/CD.
  • Человек в цикле обработки информации (Human-in-the-Loop, HITL): критически важная контрольная точка интеграции экспертных знаний и контроля человека в автоматизированный рабочий процесс. Для конфигураций, генерируемых ИИ, HITL необходим для проверки корректности, актуальности для бизнеса и безопасности предлагаемых решений перед их внедрением.

2. Настройка и требования

Запустить Cloud Shell

Хотя Google Cloud можно управлять удаленно с ноутбука, в этом практическом занятии вы будете использовать Google Cloud Shell — среду командной строки, работающую в облаке.

В консоли Google Cloud нажмите на значок Cloud Shell на панели инструментов в правом верхнем углу:

Активируйте Cloud Shell

Подготовка и подключение к среде займут всего несколько минут. После завершения вы должны увидеть примерно следующее:

Скриншот терминала Google Cloud Shell, показывающий, что среда подключена.

Эта виртуальная машина содержит все необходимые инструменты разработки. Она предоставляет постоянный домашний каталог объемом 5 ГБ и работает в облаке Google, что значительно повышает производительность сети и аутентификацию. Вся работа в этом практическом задании может выполняться в браузере. Вам не нужно ничего устанавливать.

Включите необходимые API и настройте среду.

Внутри Cloud Shell убедитесь, что идентификатор вашего проекта указан правильно:

export PROJECT_ID=$(gcloud config get-value project)
gcloud config set project $PROJECT_ID
export LOCATION="us-central1"
export BQ_LOCATION="us"
export DATASET_ID="dataplex_dq_codelab"
export TABLE_ID="ga4_transactions"

gcloud services enable dataplex.googleapis.com \
                       bigquery.googleapis.com \
                       serviceusage.googleapis.com

В примере мы используем us (многорегиональный регион) в качестве местоположения, поскольку общедоступные примеры данных, которые мы будем использовать, также находятся в us (многорегиональный регион). BigQuery требует, чтобы исходные данные и целевая таблица для запроса находились в одном и том же месте.

Создайте выделенный набор данных BigQuery.

Создайте новый набор данных BigQuery для размещения наших тестовых данных и результатов.

bq --location=us mk --dataset $PROJECT_ID:$DATASET_ID

Подготовьте выборочные данные

Для этого практического занятия вы будете использовать общедоступный набор данных, содержащий зашифрованные данные электронной коммерции из магазина Google Merchandise Store. Поскольку общедоступные наборы данных доступны только для чтения, вам необходимо создать изменяемую копию в вашем собственном наборе данных. Следующая команда bq создаст новую таблицу ga4_transactions в вашем наборе данных dataplex_dq_codelab . Она скопирует данные за один день (31.01.2021), чтобы обеспечить быстрое выполнение сканирования.

bq query \
--use_legacy_sql=false \
--destination_table=$PROJECT_ID:$DATASET_ID.$TABLE_ID \
--replace=true \
'SELECT * FROM `bigquery-public-data.ga4_obfuscated_sample_ecommerce.events_20210131`'

Настройте каталог демонстрационных файлов.

Для начала вам нужно будет клонировать репозиторий GitHub, содержащий необходимую структуру папок и вспомогательные файлы для этого практического занятия.

git clone https://github.com/GoogleCloudPlatform/devrel-demos
cd devrel-demos/data-analytics/programmatic-dq

Эта директория теперь является вашей активной рабочей областью. Все последующие файлы будут создаваться здесь.

3. Автоматизированный поиск данных с помощью профилирования Dataplex.

Профилирование данных с помощью Dataplex — это мощный инструмент для автоматического выявления статистической информации о ваших данных, такой как процент нулевых значений, уникальность и распределение значений. Этот процесс необходим для понимания структуры и качества ваших данных. Однако известным ограничением профилирования Dataplex является его неспособность полностью проверять вложенные или повторяющиеся поля (например, типы RECORD или ARRAY ) в таблице. Он может определить, что столбец имеет сложный тип, но не может профилировать отдельные поля внутри этой вложенной структуры.

Для решения этой проблемы мы преобразуем данные в специально созданные материализованные представления. Эта стратегия делает каждое поле столбцом верхнего уровня, что позволяет Dataplex анализировать каждое из них индивидуально.

Понимание вложенной схемы

Для начала рассмотрим схему нашей исходной таблицы. Набор данных Google Analytics 4 (GA4) содержит несколько вложенных и повторяющихся столбцов. Чтобы программно получить полную схему, включая все вложенные структуры, вы можете использовать команду bq show и сохранить результат в виде файла JSON.

bq show --schema --format=json $PROJECT_ID:$DATASET_ID.$TABLE_ID > bq_schema.json

Анализ файла bq_schema.json выявляет сложные структуры, такие как устройства, географические местоположения, электронная коммерция и повторяющиеся записи. Именно эти структуры требуют преобразования в плоскую структуру для эффективного профилирования.

Сглаживание данных с помощью материализованных представлений

Создание материализованных представлений (МВ) — наиболее эффективное и практичное решение проблемы вложенных данных. Предварительно вычисляя результаты в плоском виде, МВ обеспечивают значительные преимущества в производительности запросов и снижении затрат, предоставляя при этом более простую, похожую на реляционную структуру для аналитиков и инструментов профилирования.

Первая мысль, которая приходит в голову, — это объединить всё в одно большое представление. Однако этот интуитивный подход скрывает опасную ловушку, которая может привести к серьёзному повреждению данных. Давайте разберёмся, почему это серьёзная ошибка.

  1. mv_ga4_user_session_flat.sql
CREATE OR REPLACE MATERIALIZED VIEW `$PROJECT_ID.$DATASET_ID.mv_ga4_user_session_flat`
OPTIONS (
  enable_refresh = true,
  refresh_interval_minutes = 30
) AS
SELECT
  event_date, event_timestamp, event_name, user_pseudo_id, user_id, stream_id, platform,
  device.category AS device_category,
  device.operating_system AS device_os,
  device.operating_system_version AS device_os_version,
  device.language AS device_language,
  device.web_info.browser AS device_browser,
  geo.continent AS geo_continent,
  geo.country AS geo_country,
  geo.region AS geo_region,
  geo.city AS geo_city,
  traffic_source.name AS traffic_source_name,
  traffic_source.medium AS traffic_source_medium,
  traffic_source.source AS traffic_source_source
FROM
  `$PROJECT_ID.$DATASET_ID.ga4_transactions`;
  1. mv_ga4_ecommerce_transactions.sql
CREATE OR REPLACE MATERIALIZED VIEW `$PROJECT_ID.$DATASET_ID.mv_ga4_ecommerce_transactions`
OPTIONS (
  enable_refresh = true,
  refresh_interval_minutes = 30
) AS
SELECT
  event_date, event_timestamp, user_pseudo_id, ecommerce.transaction_id,
  ecommerce.total_item_quantity,
  ecommerce.purchase_revenue_in_usd,
  ecommerce.purchase_revenue,
  ecommerce.refund_value_in_usd,
  ecommerce.refund_value,
  ecommerce.shipping_value_in_usd,
  ecommerce.shipping_value,
  ecommerce.tax_value_in_usd,
  ecommerce.tax_value,
  ecommerce.unique_items
FROM
  `$PROJECT_ID.$DATASET_ID.ga4_transactions`
WHERE
  ecommerce.transaction_id IS NOT NULL;
  1. mv_ga4_ecommerce_items.sql
CREATE OR REPLACE MATERIALIZED VIEW `$PROJECT_ID.$DATASET_ID.mv_ga4_ecommerce_items`
OPTIONS (
  enable_refresh = true,
  refresh_interval_minutes = 30
) AS
SELECT
  event_date, event_timestamp, event_name, user_pseudo_id, ecommerce.transaction_id,
  item.item_id,
  item.item_name,
  item.item_brand,
  item.item_variant,
  item.item_category,
  item.item_category2,
  item.item_category3,
  item.item_category4,
  item.item_category5,
  item.price_in_usd,
  item.price,
  item.quantity,
  item.item_revenue_in_usd,
  item.item_revenue,
  item.coupon,
  item.affiliation,
  item.item_list_name,
  item.promotion_name
FROM
  `$PROJECT_ID.$DATASET_ID.ga4_transactions`,
  UNNEST(items) AS item
WHERE
  ecommerce.transaction_id IS NOT NULL;

Теперь выполните эти шаблоны с помощью инструмента командной строки bq. Команда envsubst прочитает каждый файл, заменит переменные, такие как $PROJECT_ID и $DATASET_ID , их значениями из вашей среды оболочки и перенаправит окончательный, корректный SQL-запрос в команду запроса bq.

envsubst < mv_ga4_user_session_flat.sql | bq query --use_legacy_sql=false
envsubst < mv_ga4_ecommerce_transactions.sql | bq query --use_legacy_sql=false
envsubst < mv_ga4_ecommerce_items.sql | bq query --use_legacy_sql=false

Выполните сканирование профилей с помощью клиента Python.

Теперь, когда у нас есть сглаженные представления, пригодные для профилирования, мы можем программно создавать и запускать сканирование профилирования данных Dataplex для каждого из них. Следующий скрипт на Python использует клиентскую библиотеку google-cloud-dataplex для автоматизации этого процесса.

Перед запуском скрипта крайне важно создать изолированную среду Python в каталоге вашего проекта. Это гарантирует, что зависимости проекта будут управляться отдельно, предотвращая конфликты с другими пакетами в вашей среде Cloud Shell.

# Create the virtual environment
python3 -m venv dq_venv

# Activate the environment
source dq_venv/bin/activate

Теперь установите клиентскую библиотеку Dataplex в только что активированную среду.

# Install the Dataplex client library
pip install google-cloud-dataplex

После настройки среды и установки библиотеки вы готовы создать скрипт оркестровки.

На панели инструментов Cloud Shell нажмите «Открыть редактор». Создайте новый файл с именем 1_run_dataplex_scans.py и вставьте в него следующий код на Python. Если вы клонируете репозиторий GitHub, этот файл уже будет находиться в вашей папке.

Этот скрипт создаст запрос на сканирование для каждого материализованного представления (если оно еще не существует), запустит сканирование, а затем будет опрашивать систему до завершения всех заданий сканирования.

import os
import sys
import time
from google.cloud import dataplex_v1
from google.api_core.exceptions import AlreadyExists


def create_and_run_scan(
    client: dataplex_v1.DataScanServiceClient,
    project_id: str,
    location: str,
    data_scan_id: str,
    target_resource: str,
) -> dataplex_v1.DataScanJob | None:
    """
    Creates and runs a single data profile scan.
    Returns the executed Job object without waiting for completion.
    """
    parent = client.data_scan_path(project_id, location, data_scan_id).rsplit('/', 2)[0]
    scan_path = client.data_scan_path(project_id, location, data_scan_id)

    # 1. Create Data Scan (skips if it already exists)
    try:
        data_scan = dataplex_v1.DataScan()
        data_scan.data.resource = target_resource
        data_scan.data_profile_spec = dataplex_v1.DataProfileSpec()

        print(f"[INFO] Creating data scan '{data_scan_id}'...")
        client.create_data_scan(
            parent=parent,
            data_scan=data_scan,
            data_scan_id=data_scan_id
        ).result()  # Wait for creation to complete
        print(f"[SUCCESS] Data scan '{data_scan_id}' created.")
    except AlreadyExists:
        print(f"[INFO] Data scan '{data_scan_id}' already exists. Skipping creation.")
    except Exception as e:
        print(f"[ERROR] Error creating data scan '{data_scan_id}': {e}")
        return None

    # 2. Run Data Scan
    try:
        print(f"[INFO] Running data scan '{data_scan_id}'...")
        run_response = client.run_data_scan(name=scan_path)
        print(f"[SUCCESS] Job started for '{data_scan_id}'. Job ID: {run_response.job.name.split('/')[-1]}")
        return run_response.job
    except Exception as e:
        print(f"[ERROR] Error running data scan '{data_scan_id}': {e}")
        return None


def main():
    """Main execution function"""
    # --- Load configuration from environment variables ---
    PROJECT_ID = os.environ.get("PROJECT_ID")
    LOCATION = os.environ.get("LOCATION")
    DATASET_ID = os.environ.get("DATASET_ID")

    if not all([PROJECT_ID, LOCATION, DATASET_ID]):
        print("[ERROR] One or more required environment variables are not set.")
        print("Please ensure PROJECT_ID, LOCATION, and DATASET_ID are exported in your shell.")
        sys.exit(1)

    print(f"[INFO] Using Project: {PROJECT_ID}, Location: {LOCATION}, Dataset: {DATASET_ID}")

    # List of Materialized Views to profile
    TARGET_VIEWS = [
        "mv_ga4_user_session_flat",
        "mv_ga4_ecommerce_transactions",
        "mv_ga4_ecommerce_items"
    ]
    # ----------------------------------------------------

    client = dataplex_v1.DataScanServiceClient()
    running_jobs = []

    # 1. Create and run jobs for all target views
    print("\n--- Starting Data Profiling Job Creation and Execution ---")
    for view_name in TARGET_VIEWS:
        data_scan_id = f"profile-scan-{view_name.replace('_', '-')}"
        target_resource = f"//bigquery.googleapis.com/projects/{PROJECT_ID}/datasets/{DATASET_ID}/tables/{view_name}"

        job = create_and_run_scan(client, PROJECT_ID, LOCATION, data_scan_id, target_resource)
        if job:
            running_jobs.append(job)
    print("-------------------------------------------------------\n")

    if not running_jobs:
        print("[ERROR] No jobs were started. Exiting.")
        return

    # 2. Poll for all jobs to complete
    print("--- Monitoring job completion status (checking every 30 seconds) ---")
    completed_jobs = {}

    while running_jobs:
        jobs_to_poll_next = []

        print(f"\n[STATUS] Checking status for {len(running_jobs)} running jobs...")

        for job in running_jobs:
            job_id_short = job.name.split('/')[-1][:13] 
            try:
                updated_job = client.get_data_scan_job(name=job.name)
                state = updated_job.state

                if state in (dataplex_v1.DataScanJob.State.RUNNING, dataplex_v1.DataScanJob.State.PENDING, dataplex_v1.DataScanJob.State.CANCELING):
                    print(f"  - Job {job_id_short}... Status: {state.name}")
                    jobs_to_poll_next.append(updated_job) 
                else:
                    print(f"  - Job {job_id_short}... Status: {state.name} (Complete)")
                    completed_jobs[job.name] = updated_job

            except Exception as e:
                print(f"[ERROR] Could not check status for job {job_id_short}: {e}")

        running_jobs = jobs_to_poll_next

        if running_jobs:
            time.sleep(30)

    # 3. Print final results
    print("\n--------------------------------------------------")
    print("[SUCCESS] All data profiling jobs have completed.")
    print("\nFinal Job Status Summary:")
    for job_name, job in completed_jobs.items():
        job_id_short = job_name.split('/')[-1][:13]
        print(f"  - Job {job_id_short}: {job.state.name}")
        if job.state == dataplex_v1.DataScanJob.State.FAILED:
            print(f"    - Failure Message: {job.message}")

    print("\nNext step: Analyze the profile results and generate quality rules.")


if __name__ == "__main__":
    main()

Теперь выполните скрипт из терминала Cloud Shell.

python 1_run_dataplex_scans.py

Теперь скрипт будет управлять профилированием ваших трех материализованных представлений, предоставляя обновления статуса в режиме реального времени. После завершения у вас будет подробный, машиночитаемый статистический профиль для каждого представления, готовый к следующему этапу нашего рабочего процесса: генерации правил качества данных на основе искусственного интеллекта.

Результаты сканирования профилей можно посмотреть в консоли Google Cloud.

  1. В навигационном меню перейдите в каталог Dataplex Universal и в раздел «Управление» выберите «Профиль».

5acda859404968c.png

  1. Вы должны увидеть список из трех сканирований вашего профиля, а также их последний статус выполнения. Вы можете щелкнуть по сканированию, чтобы просмотреть подробные результаты.

8a09dae0ef485289.png

От профиля BigQuery до входных данных, готовых к использованию в ИИ.

Сканирование профилей Dataplex успешно завершено. Хотя результаты доступны в API Dataplex, для использования их в качестве входных данных для модели генеративного ИИ необходимо извлечь их в структурированный локальный файл.

Следующий скрипт на Python, 2_dq_profile_save.py , программно находит последнее успешно выполненное задание сканирования профиля для нашего представления mv_ga4_user_session_flat . Затем он извлекает полный, подробный результат профилирования и сохраняет его в локальный JSON-файл с именем dq_profile_results.json . Этот файл будет служить непосредственным входным параметром для нашего анализа с помощью ИИ на следующем шаге.

В редакторе Cloud Shell создайте новый файл с именем 2_dq_profile_save.py и вставьте в него следующий код. Как и в предыдущем шаге, вы можете пропустить создание файла, если клонировали репозиторий.

import os
import sys
import json
from google.cloud import dataplex_v1
from google.api_core.exceptions import NotFound
from google.protobuf.json_format import MessageToDict

# --- Configuration ---
# The Materialized View to analyze is fixed for this step.
TARGET_VIEW = "mv_ga4_user_session_flat"
OUTPUT_FILENAME = "dq_profile_results.json"


def save_to_json_file(content: dict, filename: str):
    """Saves the given dictionary content to a JSON file."""
    try:
        with open(filename, "w", encoding="utf-8") as f:
            # Use indent=2 for a readable, "pretty-printed" JSON file.
            json.dump(content, f, indent=2, ensure_ascii=False)
        print(f"\n[SUCCESS] Profile results were saved to '{filename}'.")
    except (IOError, TypeError) as e:
        print(f"[ERROR] An error occurred while saving the file: {e}")


def get_latest_successful_job(
    client: dataplex_v1.DataScanServiceClient,
    project_id: str,
    location: str,
    data_scan_id: str
) -> dataplex_v1.DataScanJob | None:
    """Finds and returns the most recently succeeded job for a given data scan."""
    scan_path = client.data_scan_path(project_id, location, data_scan_id)
    print(f"\n[INFO] Looking for the latest successful job for scan '{data_scan_id}'...")

    try:
        # List all jobs for the specified scan, which are ordered most-recent first.
        jobs_pager = client.list_data_scan_jobs(parent=scan_path)

        # Iterate through jobs to find the first one that succeeded.
        for job in jobs_pager:
            if job.state == dataplex_v1.DataScanJob.State.SUCCEEDED:
                return job

        # If no successful job is found after checking all pages.
        return None
    except NotFound:
        print(f"[WARN] No scan history found for '{data_scan_id}'.")
        return None


def main():
    """Main execution function."""
    # --- Load configuration from environment variables ---
    PROJECT_ID = os.environ.get("PROJECT_ID")
    LOCATION = os.environ.get("LOCATION")

    if not all([PROJECT_ID, LOCATION]):
        print("[ERROR] Required environment variables PROJECT_ID or LOCATION are not set.")
        sys.exit(1)

    print(f"[INFO] Using Project: {PROJECT_ID}, Location: {LOCATION}")
    print(f"--- Starting Profile Retrieval for: {TARGET_VIEW} ---")

    # Construct the data_scan_id based on the target view name.
    data_scan_id = f"profile-scan-{TARGET_VIEW.replace('_', '-')}"

    # 1. Initialize Dataplex client and get the latest successful job.
    client = dataplex_v1.DataScanServiceClient()
    latest_job = get_latest_successful_job(client, PROJECT_ID, LOCATION, data_scan_id)

    if not latest_job:
        print(f"\n[ERROR] No successful job record was found for '{data_scan_id}'.")
        print("Please ensure the 'run_dataplex_scans.py' script has completed successfully.")
        return

    job_id_short = latest_job.name.split('/')[-1]
    print(f"[SUCCESS] Found the latest successful job: '{job_id_short}'.")

    # 2. Fetch the full, detailed profile result for the job.
    print(f"[INFO] Retrieving detailed profile results for job '{job_id_short}'...")
    try:
        request = dataplex_v1.GetDataScanJobRequest(
            name=latest_job.name,
            view=dataplex_v1.GetDataScanJobRequest.DataScanJobView.FULL,
        )
        job_with_full_results = client.get_data_scan_job(request=request)
    except Exception as e:
        print(f"[ERROR] Failed to retrieve detailed job results: {e}")
        return

    # 3. Convert the profile result to a dictionary and save it to a JSON file.
    if job_with_full_results.data_profile_result:
        profile_dict = MessageToDict(job_with_full_results.data_profile_result._pb)
        save_to_json_file(profile_dict, OUTPUT_FILENAME)
    else:
        print("[WARN] The job completed, but no data profile result was found within it.")

    print("\n[INFO] Script finished successfully.")


if __name__ == "__main__":
    main()

Теперь запустите скрипт из терминала:

python 2_dq_profile_save.py

После успешного завершения в вашей директории появится новый файл с именем dq_profile_results.json . Этот файл содержит подробные статистические метаданные, которые мы будем использовать для генерации правил качества данных. Чтобы проверить содержимое файла dq_profile_results.json , выполните следующую команду:

cat dq_profile_results.json

4. Создание правил качества данных с помощью интерфейса командной строки Gemini.

Установите и настройте Gemini CLI.

Хотя вы можете вызывать API Gemini программно, использование такого инструмента, как Gemini CLI, предлагает мощный интерактивный способ интеграции генеративного ИИ непосредственно в ваши рабочие процессы в терминале. Gemini CLI — это не просто чат-бот; это инструмент командной строки, который может читать ваши локальные файлы, понимать ваш код и взаимодействовать с другими системными инструментами, такими как gcloud, для автоматизации сложных задач. Это делает его идеальным для нашего случая.

Предварительное условие

Во-первых, убедитесь, что у вас есть необходимые предварительные условия: в вашей среде Cloud Shell должна быть установлена ​​версия Node.js 20 или выше. Проверить версию можно, выполнив команду node -v .

Установка

Существует два способа использования Gemini CLI: временная установка или более постоянная установка. Мы рассмотрим оба метода здесь.

Вы можете запустить Gemini CLI напрямую на один сеанс без какой-либо постоянной установки. Это самый чистый и быстрый способ «протестировать», поскольку он сохраняет вашу среду полностью неизменной.

В терминале Cloud Shell выполните следующую команду:

npx https://github.com/google-gemini/gemini-cli

Эта команда временно загружает и запускает пакет командной строки.

Для любого реального проекта рекомендуется установить CLI локально в каталог проекта. Такой подход имеет несколько ключевых преимуществ:

  • Изоляция зависимостей: она гарантирует, что у вашего проекта будет своя собственная версия CLI, предотвращая конфликты версий с другими проектами.
  • Воспроизводимость: Любой, кто клонирует ваш проект, сможет установить точно такие же зависимости, что делает вашу конфигурацию надежной и переносимой.
  • Соответствие лучшим практикам: оно следует стандартной модели управления зависимостями проекта Node.js, избегая недостатков глобальных (-g) установок.

Для локальной установки CLI выполните следующую команду из папки вашего проекта ( programmatic-dq ):

npm install @google/gemini-cli

Это создаст папку node_modules внутри programmatic-dq. Чтобы запустить только что установленную версию, используйте команду npx.

npx gemini

Первоначальная настройка

Какой бы метод вы ни выбрали, при первом запуске CLI система проведет вас через одноразовый процесс настройки.

8a25fab5951c6c39.png

Вам будет предложено выбрать цветовую тему, а затем пройти аутентификацию. Самый простой способ — войти в систему с помощью своей учетной записи Google, когда появится соответствующий запрос. Бесплатного тарифа достаточно для выполнения этого задания.

Теперь, когда CLI установлен и настроен, вы готовы приступить к созданию правил. CLI знает, какие файлы находятся в текущем каталоге, что крайне важно для следующего шага.

Сгенерируйте правила качества данных.

Хотя можно попросить LLM сгенерировать конфигурационный файл за один раз, недетерминированная природа генеративных моделей означает, что результат не всегда может идеально соответствовать строгой схеме, требуемой такими инструментами, как gcloud. Более надежным методом является интерактивный многоэтапный процесс, в котором сначала ИИ выступает в роли аналитика, предлагая план, вы (эксперт-человек) рассматриваете и утверждаете этот план, и только после этого ИИ генерирует окончательный код на основе ваших утвержденных инструкций.

Такой подход превращает процесс из простой команды в совместную работу, гарантируя точность, проверку и готовность конечного результата к развертыванию.

Сначала мы попросим Gemini проанализировать статистический профиль и выступить в роли эксперта по обработке данных, предложив план действий. Мы прямо указываем ему пока не записывать YAML-файл. Это позволит сосредоточить его внимание на анализе и обосновании.

В интерактивной сессии Gemini CLI вставьте следующую подсказку:

cd9792a6f1eb9f3.png

You are an expert Google Cloud Dataplex engineer. 
Your first task is to create a plan. I have a file in the current directory named ./dq_profile_results.json. 
Based on the statistical data within that file, propose a step-by-step plan to create a Dataplex data quality rules file. 
Your plan should identify which specific columns are good candidates for rules like nonNullExpectation, setExpectation, or rangeExpectation, and explain why based on the metrics (e.g., "Plan to create a nonNullExpectation for column X because its null percentage is 0%"). 
Do not write the YAML file yet. Just provide the plan.

Gemini проанализирует JSON-файл и должен вернуть структурированный план, похожий на этот:

Plan to create Dataplex data quality rules

1. Create a row count rule:
  - Analysis: The profile shows a total of 26,489 rows.
  - Proposed Rule: I will implement a rowCount rule to ensure the table's row count remains within a healthy range (e.g., 20,000 to 30,000). This is a fundamental check to guard against major data loading failures or unexpected deletions.
2. Implement non-null expectation rules:
  - Analysis: The profiling results show that many columns have no null values. For example, event_date, event_timestamp, event_name, user_pseudo_id, stream_id, and platform all appear to be fully populated.
  - Proposed Rules: I will create nonNullExpectation rules for columns where the nullRatio is 0 or not present, indicating they are always expected to contain a value:
    - event_date
    - event_timestamp
    - event_name
    - user_pseudo_id
    - stream_id
    - platform
    - device_category
    - device_os
3. Implement set expectation rules:
  - Analysis: Several columns contain a small, finite set of values, making them ideal for set validation.
  - Proposed Rules: I will create setExpectation rules for these columns:
    - platform: It only contains the value "WEB".
    - device_category: It contains only "desktop", "mobile", and "tablet".
    - geo_continent: It has a well-defined list of continents.
traffic_source_medium: It has a small list of valid mediums like "organic", "(none)", and "referral".
4. Implement range expectation rules:
  - Analysis: The profile provides minimum and maximum values for numeric and string length fields.
  - Proposed Rules: I will create rangeExpectation rules for the following:
    - event_timestamp: The profile shows a clear min and max. I will use these values to create a range check.
    - event_date (String Length): The profile indicates that the minLength and maxLength are both 8, which is perfect for enforcing a YYYYMMDD format.

Это самый важный этап во всем рабочем процессе: проверка с участием человека (Human-in-the-Loop, HITL). План, сгенерированный Gemini, основан исключительно на статистических закономерностях в данных. Он не учитывает контекст вашего бизнеса, будущие изменения данных или конкретное предназначение, стоящее за вашими данными. Ваша роль как эксперта-человека заключается в проверке, корректировке и утверждении этого плана, прежде чем превратить его в код.

Внимательно изучите план, предоставленный компанией Gemini.

  • Это имеет смысл?
  • Соответствует ли это вашим знаниям в области бизнеса?
  • Существуют ли какие-либо статистически обоснованные, но практически бесполезные правила?

Результаты, полученные с помощью Gemini, могут отличаться. Ваша задача — их улучшить.

Например, представьте, что план предлагает правило rowCount поскольку таблица в выборке данных имеет фиксированное количество строк. Как эксперт, вы, вероятно, знаете, что размер этой таблицы, как ожидается, будет расти ежедневно, что делает строгое правило подсчета строк непрактичным и, скорее всего, приведет к ложным срабатываниям. Это прекрасный пример применения бизнес-контекста, которого не хватает ИИ.

Теперь вам нужно отправить обратную связь в Gemini и дать окончательную команду на генерацию кода. Следующее сообщение необходимо адаптировать в соответствии с фактически полученным планом и внесенными вами исправлениями.

Приведённый ниже шаблон — это текст-задание . В первой строке вы должны указать свои конкретные исправления. Если предложенный вам Gemini план идеален и не требует изменений, вы можете просто удалить эту строку.

В той же сессии Gemini вставьте адаптированную версию следующего запроса:

[YOUR CORRECTIONS AND APPROVAL GO HERE. Examples:
- "The plan looks good. Please proceed."
- "The rowCount rule is not necessary, as the table size changes daily. The rest of the plan is approved. Please proceed."
- "For the setExpectation on the geo_continent column, please also include 'Antarctica'."]

Once you have incorporated my feedback, please generate the `dq_rules.yaml` file.

You must adhere to the following strict requirements:

- Schema Compliance: The YAML structure must strictly conform to the DataQualityRule specification. For a definitive source of truth, you must refer to the sample_rule.yaml file in the current directory and the DataQualityRule class definition in the local virtual environment path: ./dq_venv/.../google/cloud/dataplex_v1/types/data_quality.py.

- Data-Driven Values: All rule parameters, such as thresholds or expected values, must be derived directly from the statistical metrics in dq_profile_results.json.

- Rule Justification: For each rule, add a comment (#) on the line above explaining the justification, as you outlined in your plan.

- Output Purity: The final output must only be the raw YAML code block, perfectly formatted and ready for immediate deployment.

Теперь Gemini сгенерирует содержимое YAML на основе ваших точных инструкций, проверенных человеком. После завершения вы найдете новый файл с именем dq_rules.yaml в вашей рабочей директории.

Создайте и запустите проверку качества данных.

Теперь, когда у вас есть сгенерированный ИИ и проверенный человеком файл dq_rules.yaml , вы можете смело его развернуть.

Чтобы выйти из Gemini CLI, введите /quit или дважды нажмите Ctrl+C .

Следующая команда gcloud создает новый ресурс сканирования данных Dataplex. Она еще не запускает сканирование; она просто регистрирует определение и конфигурацию сканирования (наш YAML-файл) в Dataplex.

Выполните эту команду в терминале:

export DQ_SCAN="dq-scan"
gcloud dataplex datascans create data-quality $DQ_SCAN \
    --project=$PROJECT_ID \
    --location=$LOCATION \
    --data-quality-spec-file=dq_rules.yaml \
    --data-source-resource="//bigquery.googleapis.com/projects/$PROJECT_ID/datasets/$DATASET_ID/tables/mv_ga4_user_session_flat"

После определения параметров сканирования вы можете запустить задание для его выполнения.

gcloud dataplex datascans run $DQ_SCAN --location=$LOCATION --project=$PROJECT_ID

Эта команда выведет идентификатор задания. Вы можете отслеживать статус этого задания в разделе Dataplex в консоли Google Cloud. После завершения результаты будут записаны в таблицу BigQuery для анализа.

5. Критическая роль участия человека в процессе принятия решений (Human-In-The-Loop, HITL)

Хотя использование Gemini для ускорения генерации правил невероятно эффективно, крайне важно рассматривать ИИ как высококвалифицированного второго пилота, а не как полностью автономного пилота. Процесс «человек в контуре управления» (Human-in-the-Loop, HITL) — это не просто рекомендация, а обязательный, основополагающий шаг в любом надежном и заслуживающем доверия рабочем процессе управления данными. Простое развертывание артефактов, сгенерированных ИИ, без тщательного контроля со стороны человека — это верный путь к провалу.

Представьте себе сгенерированный ИИ файл dq_rules.yaml как запрос на слияние, отправленный чрезвычайно быстрым, но неопытным разработчиком ИИ. Он требует тщательной проверки со стороны опытного эксперта — вас — прежде чем его можно будет объединить с «основной веткой» вашей политики управления и развернуть. Эта проверка необходима для устранения присущих большим языковым моделям недостатков.

Вот подробное объяснение того, почему эта проверка человеком незаменима и на что именно вам следует обратить внимание:

1. Контекстная проверка: ИИ не обладает пониманием бизнес-процессов.

  • Слабое место магистра права (LLM) : магистр права — мастер в области закономерностей и статистики, но совершенно не понимает контекст вашего бизнеса. Например, если в столбце new_campaign_id доля нулевых значений составляет 98%, магистр права может проигнорировать этот столбец по статистическим причинам.
  • Ключевая роль человека: Вы, эксперт-человек, знаете, что поле new_campaign_id было добавлено только вчера в преддверии крупного запуска продукта на следующей неделе. Вы знаете, что сейчас его доля нулевых значений должна быть высокой, но ожидается, что она значительно снизится. Вы также знаете, что после заполнения поле должно соответствовать определенному формату. Искусственный интеллект не может получить эти внешние бизнес-знания. Ваша задача — применить этот бизнес-контекст к статистическим рекомендациям ИИ, переопределяя или дополняя их по мере необходимости.

2. Правильность и точность: защита от иллюзий и незначительных ошибок.

  • Слабое место LLM: LLM могут быть «уверенно неправы». Они могут «галлюцинировать» или генерировать код с едва заметными ошибками. Например, они могут сгенерировать YAML-файл с правильно названным правилом, но недопустимым параметром, или неправильно написать тип правила (например, setExpectations вместо правильного setExpectation ). Эти незаметные ошибки приведут к сбою развертывания, но их трудно обнаружить.
  • Ключевая роль человека: Ваша задача — выступать в роли главного линтера и валидатора схемы. Вы должны тщательно проверять сгенерированный YAML-файл на соответствие официальной спецификации Dataplex DataQualityRule . Вы проверяете не просто, «правильно ли он выглядит», а его синтаксическую и семантическую корректность, чтобы убедиться в 100% соответствии целевому API. Именно поэтому в практическом задании Gemini предлагает ссылаться на файлы схемы — чтобы уменьшить вероятность ошибки, — но окончательная проверка остается за вами.

3. Безопасность и снижение рисков: предотвращение негативных последствий.

  • Слабое место LLM: Неправильное правило качества данных, внедренное в производство, может иметь серьезные последствия. Если ИИ предлагает слишком широкий rangeExpectation для суммы финансовой транзакции, он может не обнаружить мошеннические действия. И наоборот, если он предлагает слишком строгое правило на основе небольшой выборки данных, он может завалить вашу дежурную команду тысячами ложных срабатываний, что приведет к усталости от оповещений и к тому, что реальные проблемы будут упущены.
  • Ключевая роль человека: вы — инженер по безопасности. Вы должны оценить потенциальное влияние каждого правила, предложенного ИИ. Задайте себе вопросы: «Что произойдет, если это правило не сработает? Можно ли предпринять какие-либо действия в связи с этим предупреждением? Каков риск, если это правило будет ошибочно выполнено?» Эта оценка риска — уникальная человеческая способность, которая сопоставляет стоимость сбоя с пользой от проверки.

4. Управление как непрерывный процесс: учет перспективных знаний

  • Слабое место LLM: знания ИИ основаны на статическом снимке данных — профиль формируется в определенный момент времени. Он не обладает информацией о будущих событиях.
  • Ключевая роль человека: ваша стратегия управления должна быть ориентирована на будущее. Вы знаете, что в следующем месяце запланирована миграция источника данных, что изменит stream_id. Вы знаете, что в список geo_country добавляется новая страна. Процесс HITL (Health Instructional Technology) — это процесс внедрения этих знаний о будущем состоянии, обновления или временного отключения правил для предотвращения сбоев во время запланированных бизнес- или технических изменений. Качество данных — это не разовая настройка; это живой процесс, который должен развиваться, и только человек может направлять это развитие.

Вкратце, HITL — это важнейший механизм обеспечения качества и безопасности, который превращает управление на основе ИИ из новой, но рискованной идеи в ответственную, масштабируемую и соответствующую корпоративным стандартам практику. Он гарантирует, что внедряемые в конечном итоге политики не только ускорены ИИ, но и проверены людьми, сочетая скорость машин с мудростью и контекстом экспертов.

Однако такой акцент на человеческом контроле не уменьшает ценность ИИ. Напротив, генеративный ИИ играет решающую роль в ускорении самого процесса HITL.

Без искусственного интеллекта инженеру по обработке данных пришлось бы:

  1. Напишите вручную сложные SQL-запросы для анализа данных (например, COUNT DISTINCT , AVG , MIN , MAX для каждого столбца).
  2. Тщательно проанализируйте результаты по каждой таблице отдельно.
  3. Написать каждую строку файла правил YAML с нуля — утомительная и чреватая ошибками задача.

Искусственный интеллект автоматизирует эти трудоемкие и отнимающие много времени этапы. Он выступает в роли неутомимого аналитика, мгновенно обрабатывающего статистический профиль и предоставляющего хорошо структурированный, на 80% готовый «первый проект» политики.

Это коренным образом меняет характер работы человека. Вместо того чтобы тратить часы на ручную обработку данных и написание шаблонного кода, специалист может немедленно сосредоточиться на наиболее важных задачах:

  • Применение критического бизнес-контекста.
  • Проверка корректности логики ИИ.
  • Принятие стратегических решений о том, какие правила действительно важны.

В этом партнерстве ИИ обрабатывает вопрос «что» (каковы статистические закономерности?), освобождая человека для сосредоточения на вопросе «почему» (почему эта закономерность важна для нашего бизнеса?) и вопросе «что из этого следует» (какой должна быть наша политика?). Таким образом, ИИ не заменяет цикл; он делает каждый цикл быстрее, умнее и эффективнее.

6. Очистка окружающей среды

Чтобы избежать в будущем списания средств с вашего аккаунта Google Cloud за ресурсы, использованные в этом практическом задании, вам следует удалить проект, содержащий эти ресурсы. Однако, если вы хотите сохранить проект, вы можете удалить отдельные созданные вами ресурсы.

Удалите результаты сканирования Dataplex.

Сначала удалите созданные вами профили и результаты сканирования качества. Чтобы предотвратить случайное удаление важных ресурсов, эти команды используют конкретные имена результатов сканирования, созданных в этом практическом задании.

# Delete the Data Quality Scan
gcloud dataplex datascans delete dq-scan \
    --location=us-central1 \
    --project=$PROJECT_ID --quiet

# Delete the Data Profile Scans
gcloud dataplex datascans delete profile-scan-mv-ga4-user-session-flat \
    --location=us-central1 \
    --project=$PROJECT_ID --quiet

gcloud dataplex datascans delete profile-scan-mv-ga4-ecommerce-transactions \
    --location=us-central1 \
    --project=$PROJECT_ID --quiet

gcloud dataplex datascans delete profile-scan-mv-ga4-ecommerce-items \
    --location=us-central1 \
    --project=$PROJECT_ID --quiet

Удалите набор данных BigQuery.

Далее удалите набор данных BigQuery. Эта команда необратима и использует флаг -f (принудительное удаление) для удаления набора данных и всех его таблиц без подтверждения.

# Manually type this command to confirm you are deleting the correct dataset
bq rm -r -f --dataset $PROJECT_ID:dataplex_dq_codelab

7. Поздравляем!

Вы успешно завершили практическое занятие!

Вы разработали комплексный программный рабочий процесс управления данными. Начали с использования материализованных представлений (Materialized Views) для преобразования сложных данных BigQuery в плоскую структуру, что сделало их пригодными для анализа. Затем программно запустили сканирование профилирования Dataplex для генерации статистических метаданных. Что наиболее важно, вы использовали CLI Gemini для анализа результатов профилирования и интеллектуального создания артефакта «политика как код» ( dq_rules.yaml ). Затем с помощью CLI развернули эту конфигурацию в качестве автоматизированного сканирования качества данных, замкнув цикл современной масштабируемой стратегии управления.

Теперь вы обладаете базовыми знаниями для создания надежных, ускоренных с помощью ИИ и проверенных людьми систем обеспечения качества данных в Google Cloud.

Что дальше?

  • Интеграция с CI/CD: Возьмите файл dq_rules.yaml и зафиксируйте его в репозитории Git. Создайте конвейер CI/CD (например, с помощью Cloud Build или GitHub Actions), который автоматически развертывает сканирование Dataplex при каждом обновлении файла правил.
  • Изучите возможности пользовательских SQL-правил: выйдите за рамки стандартных типов правил. Dataplex поддерживает пользовательские SQL-правила для обеспечения более сложной, специфичной для бизнеса логики, которую невозможно выразить с помощью предопределенных проверок. Это мощная функция для адаптации проверки к вашим уникальным требованиям.
  • Оптимизируйте сканирование для повышения эффективности и снижения затрат: для очень больших таблиц можно улучшить производительность и сократить расходы, не всегда сканируя весь набор данных. Рассмотрите возможность использования фильтров для сужения области сканирования до определенных временных рамок или сегментов данных, или настройте выборочное сканирование для проверки репрезентативного процента ваших данных.
  • Визуализируйте результаты: результаты каждого сканирования качества данных Dataplex записываются в таблицу BigQuery. Подключите эту таблицу к Looker Studio, чтобы создавать панели мониторинга, отслеживающие показатели качества данных с течением времени, агрегированные по определенным вами параметрам (например, полнота, достоверность). Это делает мониторинг проактивным и наглядным для всех заинтересованных сторон.
  • Делитесь передовым опытом: поощряйте обмен знаниями внутри вашей организации, чтобы использовать коллективный опыт и улучшить стратегию качества данных. Формирование культуры доверия к данным является ключом к максимальной эффективности ваших усилий по управлению данными.
  • Ознакомьтесь с документацией: