使用支持 AI 的 BigQuery DataFrames 软件包从结构化和非结构化数据中获取数据洞见

1. 概览

在本实验中,您将在 BigQuery Studio 的 Python 笔记本中使用 BigQuery DataFrame,通过 Python 从数据中获取数据洞见。利用 Google 的生成式 AI 分析和可视化非结构化文本数据。

您将创建一个 Python 笔记本,以对公开的客户投诉数据库进行分类和总结。此方法可以适用于任何非结构化文本数据。

目标

在本实验中,您将学习如何执行以下任务:

  • 在 BigQuery Studio 中激活和使用 Python 笔记本
  • 使用 BigQuery DataFrames 软件包连接到 BigQuery
  • 使用 BigQuery ML 从非结构化文本数据创建嵌入,并连接到 Vertex AI 中的文本嵌入端点
  • 使用 BigQuery ML 对嵌入进行聚类
  • 通过 BigQuery ML 使用 LLM 汇总集群

2. 要求

  • 一个浏览器,例如 ChromeFirefox
  • 启用了结算功能的 Google Cloud 项目

准备工作

若要按照本 Codelab 中的说明操作,您需要一个启用了 BigQuery Studio 且已关联结算账号的 Google Cloud 项目。

  1. Google Cloud 控制台的项目选择器页面上,选择或创建一个 Google Cloud 项目
  2. 确保您的 Google Cloud 项目已启用结算功能。了解如何检查项目是否已启用结算功能
  3. 按照说明启用 BigQuery Studio 进行资产管理

准备 BigQuery Studio

创建一个空白的笔记本,并将其连接到运行时。

  1. 在 Google Cloud 控制台中,前往 BigQuery Studio
  2. 点击 + 按钮旁边的
  3. 选择 Python 笔记本
  4. 关闭模板选择器。
  5. 选择 + 代码以创建新的代码单元。
  6. 从代码单元格安装最新版本的 BigQuery DataFrames 软件包。输入以下命令。
    %pip install --upgrade bigframes --quiet
    
    点击 🞂? 按钮或按 Shift + Enter 运行代码单元。

3. 读取公共数据集

在新的代码单元中运行以下代码,初始化 BigQuery DataFrames 软件包:

import bigframes.pandas as bpd

bpd.options.bigquery.ordering_mode = "partial"

注意:在本教程中,我们使用实验性“部分排序模式”,该模式可与类似于 Pandas 的过滤功能搭配使用,从而实现更高效的查询。某些需要严格排序或索引的 pandas 功能可能无法正常运行。

消费者投诉数据库

消费者投诉数据库通过 Google Cloud 的公共数据集计划在 BigQuery 上提供。该数据集收集了有关消费者金融产品和服务的投诉,数据由美国消费者金融保护局收集。

在 BigQuery 中,查询 bigquery-public-data.cfbp_complaints.complaint_database 表,以分析消费者投诉数据库。使用 bigframes.pandas.read_gbq() 方法根据查询字符串或表 ID 创建 DataFrame。

在新的代码单元中运行以下代码,以创建一个名为“feedback”的 DataFrame:

feedback = bpd.read_gbq(
    "bigquery-public-data.cfpb_complaints.complaint_database"
)

了解 DataFrame 的基本信息

使用 DataFrame.peek() 方法下载少量数据样本。

运行此单元

feedback.peek()

预期输出:

  date_received                  product ... timely_response  consumer_disputed complaint_id  
0    2014-03-05  Bank account or service ...            True              False       743665   
1    2014-01-21  Bank account or service ...            True              False       678608   
2    2020-12-31          Debt collection ...            True               <NA>      4041190   
3    2014-02-12          Debt collection ...            True              False       714350   
4    2015-02-23          Debt collection ...            True              False      1251358   

注意:head() 需要排序,并且通常不如 peek() 高效,除非您要可视化数据样本。

与 pandas 一样,您可以使用 DataFrame.dtypes 属性查看所有可用列及其对应的数据类型。这些数据以与 pandas 兼容的方式公开。

运行此单元

feedback.dtypes

预期输出:

date_received                   date32[day][pyarrow]
product                              string[pyarrow]
subproduct                           string[pyarrow]
issue                                string[pyarrow]
subissue                             string[pyarrow]
consumer_complaint_narrative         string[pyarrow]
company_public_response              string[pyarrow]
company_name                         string[pyarrow]
state                                string[pyarrow]
zip_code                             string[pyarrow]
tags                                 string[pyarrow]
consumer_consent_provided            string[pyarrow]
submitted_via                        string[pyarrow]
date_sent_to_company            date32[day][pyarrow]
company_response_to_consumer         string[pyarrow]
timely_response                              boolean
consumer_disputed                            boolean
complaint_id                         string[pyarrow]
dtype: object

DataFrame.describe() 方法会从 DataFrame 中查询一些基本统计信息。由于此 DataFrame 不包含任何数值列,因此它会显示非 null 值的计数和不重复值的数量摘要。

运行此单元

# Exclude some of the larger columns to make the query more efficient.
feedback.drop(columns=[
  "consumer_complaint_narrative",
  "company_public_response",
  "company_response_to_consumer",
]).describe()

预期输出:

         product  subproduct    issue  subissue  company_name    state ... timely_response  consumer_disputed  complaint_id
count    3458906     3223615  3458906   2759004       3458906  3417792 ...         3458906             768399       3458906
nunique       18          76      165       221          6694       63 ...               2                  2       3458906

4. 探索数据

在深入了解实际投诉之前,请对 DataFrame 使用类似 Pandas 的方法来直观呈现数据。

可视化 DataFrame

有几种内置可视化方法,例如 DataFrame.plot.hist()。由于此 DataFrame 主要包含字符串和布尔值数据,因此我们可以先进行一些汇总,以详细了解各个列。

统计收到的来自每个州的投诉数量。

complaints_by_state = (
  feedback.groupby(
    "state", as_index=False,
  ).size()
  .rename(columns={"size": "total_complaints"})
  .sort_values(by="total_complaints", ascending=False)
)

使用 DataFrame.to_pandas() 方法将其转换为 Pandas DataFrame。

complaints_pd = complaints_by_state.head(10).to_pandas()

对此下载的 DataFrame 使用 Pandas 可视化方法。

complaints_pd.plot.bar(x="state", y="total_complaints")

显示加利福尼亚州投诉最多的条形图

与其他数据集联接

以前,您可以查看每个州收到的投诉,但这样会丢失重要背景信息。有些州的人口比其他州多。与人口数据集(例如 美国人口调查局的美国社区调查问卷bigquery-public-data.geo_us_boundaries.states)联接。

us_states = bpd.read_gbq("bigquery-public-data.geo_us_boundaries.states")
us_survey = bpd.read_gbq("bigquery-public-data.census_bureau_acs.state_2020_5yr")

# Ensure there are leading 0s on GEOIDs for consistency across tables.
us_states = us_states.assign(
    geo_id=us_states["geo_id"].str.pad(2, fillchar="0")
)

us_survey = us_survey.assign(
    geo_id=us_survey["geo_id"].str.pad(2, fillchar="0")
)

“美国社区调查”按 GEOID 标识各州。与“states”表联接,按两个字母的州代码获取人口数。

pops = us_states.set_index("geo_id")[["state"]].join(
  us_survey.set_index("geo_id")[["total_pop"]]
)

现在,将其与投诉数据库联接,以便比较人口与投诉数量。

complaints_and_pops = complaints_by_state.set_index("state").join(
    pops.set_index("state")
)

创建散点图,比较各州的人口数量与投诉数量。

(
  complaints_and_pops
  .to_pandas()
  .plot.scatter(x="total_pop", y="total_complaints")
)

比较人口与投诉次数的散点图

将人口数与投诉数量进行比较时,有几个州似乎是离群值。我们将此作为一项练习留给读者来完成,即使用点标签绘制图表来识别这些点。同样,提出一些假设来解释这种情况的原因(例如受众特征不同、金融服务公司数量不同等),并对其进行测试。

5. 计算嵌入

通常,重要信息隐藏在非结构化数据(例如文本、音频或图片)中。在此示例中,投诉数据库中的大部分实用信息都包含在投诉的文本内容中。

AI 和传统技术(例如情感分析、“词袋”和 word2vec)可以从非结构化数据中提取一些定量信息。近来,“矢量嵌入”模型(与 LLM 密切相关)可以创建一系列浮点数,表示文本的语义信息。

选择数据库的一部分

与其他操作相比,运行向量嵌入模型会使用更多资源。为降低费用和避免配额问题,请在本教程的其余部分中选择部分数据。

import bigframes.pandas as bpd

bpd.options.bigquery.ordering_mode = "partial"

feedback = bpd.read_gbq(
    "bigquery-public-data.cfpb_complaints.complaint_database"
)

# Note: if not using ordering_mode = "partial", you must specify these in read_gbq
# for these to affect query efficiency.
# feedback = bpd.read_gbq(
#    "bigquery-public-data.cfpb_complaints.complaint_database",
#     columns=["consumer_complaint_narrative"],
#     filters= [
#         ("consumer_complaint_narrative", "!=", ""),
#         ("date_received", "==", "2022-12-01")])

feedback.shape

2022 年 12 月 1 日提交了约 1,000 条投诉,而整个数据库中总共有近 350 万行(请参阅 feedback.shape)。

仅选择 2022-12-01 的数据,并且仅选择 consumer_complaint_narrative 列。

import datetime

feedback = feedback[
    # Filter rows by passing in a boolean Series.
    (feedback["date_received"] == datetime.date(2022, 12, 1))
    & ~(feedback["date_received"].isnull())
    & ~(feedback["consumer_complaint_narrative"].isnull())
    & (feedback["consumer_complaint_narrative"] != "")
    & (feedback["state"] == "CA")

    # Uncomment the following if using free credits for a workshop.
    # Billing accounts with free credits have limited Vertex AI quota.
    # & (feedback["product"] == "Mortgage")
][
    # Filter columns by passing in a list of strings.
    ["consumer_complaint_narrative"]
]

feedback.shape

pandas 中的 drop_duplicates 方法需要对行进行总排序,因为它会尝试选择第一个或最后一个匹配的行,并保留与其关联的索引。

而是通过调用 groupby 方法进行汇总,以删除重复的行。

feedback = (
  feedback.groupby("consumer_complaint_narrative", as_index=False)
  .size()
)[["consumer_complaint_narrative"]]

feedback.shape

生成嵌入

BigQuery DataFrame 通过 TextEmbeddingGenerator 类生成嵌入向量。这基于 BigQuery ML 中的 ML.GENERATE_EMBEDDING 方法,该方法会调用 Vertex AI 提供的文本嵌入模型

from bigframes.ml.llm import TextEmbeddingGenerator

embedding_model = TextEmbeddingGenerator(
    model_name="text-embedding-004"
)
feedback_embeddings = embedding_model.predict(feedback)

我们来看看嵌入的样子。这些向量表示文本嵌入模型理解的文本的语义含义。

feedback_embeddings.peek()

预期输出:

                        ml_generate_embedding_result  \
0  [ 7.36380890e-02  2.11779331e-03  2.54309829e-...   
1  [-1.10935252e-02 -5.53950183e-02  2.01338865e-...   
2  [-7.85628427e-03 -5.39347418e-02  4.51385677e-...   
3  [ 0.02013054 -0.0224789  -0.00164843  0.011354...   
4  [-1.51684484e-03 -5.02693094e-03  1.72322839e-...   

这些向量具有多个维度。我们来看看单个嵌入矢量:

feedback_embeddings["ml_generate_embedding_result"].peek().iloc[0]

嵌入生成是根据“部分成功”协定进行的。这意味着,某些行可能存在错误,并且不会生成嵌入。'ml_generate_embedding_status' 列会显示错误消息。空表示没有错误。

过滤嵌入,使其仅包含未发生错误的行。

mask = feedback_embeddings["ml_generate_embedding_status"] == ""
valid_embeddings = feedback_embeddings[mask]
valid_embeddings.shape

6. 使用文本嵌入进行分群

现在,使用 k-means 对嵌入进行分组。在本演示中,请使用任意数量的组(也称为质心)。正式版解决方案应使用Silhouette 方法等技术来调整质心点的数量。

from bigframes.ml.cluster import KMeans

num_clusters = 5
cluster_model = KMeans(n_clusters=num_clusters)
cluster_model.fit(valid_embeddings["ml_generate_embedding_result"])
clusters = cluster_model.predict(valid_embeddings)
clusters.peek()

移除所有嵌入失败。

mask = clusters["ml_generate_embedding_status"] == ""
clusters = clusters[mask]

预览并查看每个重心评论的分布情况。

clusters.groupby("CENTROID_ID").size()

7. 汇总集群

提供与每个重心相关的一些评论,并让 Gemini 总结这些投诉。提示工程是一门新兴领域,但互联网上有许多优秀的示例,例如 https://www.promptingguide.ai/。

from bigframes.ml.llm import GeminiTextGenerator

preamble = "What is the main concern in this list of user complaints:"
suffix = "Write the main issue using a formal tone."

# Now let's sample the raw comments and get the LLM to summarize them.
prompts = []
for centroid_id in range(1, num_clusters + 1):
  cluster = clusters[clusters["CENTROID_ID"] == centroid_id]
  comments = "\n".join(["- {0}".format(x) for x in cluster.content.peek(40)])
  prompts.append("{}:\n{}\n{}".format(preamble, comments, suffix))

prompt_df = bpd.DataFrame(prompts)
gemini = GeminiTextGenerator(model_name="gemini-1.5-flash-001")
issues = gemini.predict(X=prompt_df, temperature=0.0)
issues.peek()

使用 Gemini 根据摘要撰写报告。

from IPython.display import display, Markdown

prompt = "Turn this list of issues into a short, concise report:"
for value in issues["ml_generate_text_llm_result"]:
  prompt += "- {}".format(value)
prompt += "Using a formal tone, write a markdown text format report."

summary_df = bpd.DataFrame(([prompt]))
summary = gemini.predict(X=summary_df, temperature=0.0)

report = (summary["ml_generate_text_llm_result"].values[0])
display(Markdown(report))

8. 清理

如果您为本教程创建了新的 Google Cloud 项目,可以将其删除,以免系统因创建的表格或其他资源而向您收取额外费用。

9. 恭喜!

您已使用 BigQuery DataFrame 分析结构化数据和非结构化数据。在学习过程中,您探索了 Google Cloud 的公共数据集、BigQuery Studio 中的 Python 笔记本、BigQuery ML、Vertex AI,以及 BigQuery Studio 的自然语言转换为 Python 功能。真了不起!

后续步骤