1. 概览
在本实验中,您将在 BigQuery Studio 的 Python 笔记本中使用 BigQuery DataFrame,通过 Python 从数据中获取数据洞见。利用 Google 的生成式 AI 分析和可视化非结构化文本数据。
您将创建一个 Python 笔记本,以对公开的客户投诉数据库进行分类和总结。此方法可以适用于任何非结构化文本数据。
目标
在本实验中,您将学习如何执行以下任务:
- 在 BigQuery Studio 中激活和使用 Python 笔记本
- 使用 BigQuery DataFrames 软件包连接到 BigQuery
- 使用 BigQuery ML 从非结构化文本数据创建嵌入,并连接到 Vertex AI 中的文本嵌入端点
- 使用 BigQuery ML 对嵌入进行聚类
- 通过 BigQuery ML 使用 LLM 汇总集群
2. 要求
准备工作
若要按照本 Codelab 中的说明操作,您需要一个启用了 BigQuery Studio 且已关联结算账号的 Google Cloud 项目。
- 在 Google Cloud 控制台的项目选择器页面上,选择或创建一个 Google Cloud 项目
- 确保您的 Google Cloud 项目已启用结算功能。了解如何检查项目是否已启用结算功能
- 按照说明启用 BigQuery Studio 进行资产管理。
准备 BigQuery Studio
创建一个空白的笔记本,并将其连接到运行时。
- 在 Google Cloud 控制台中,前往 BigQuery Studio。
- 点击 + 按钮旁边的 ▼。
- 选择 Python 笔记本。
- 关闭模板选择器。
- 选择 + 代码以创建新的代码单元。
- 从代码单元格安装最新版本的 BigQuery DataFrames 软件包。输入以下命令。
点击 🞂? 按钮或按 Shift + Enter 运行代码单元。%pip install --upgrade bigframes --quiet
3. 读取公共数据集
在新的代码单元中运行以下代码,初始化 BigQuery DataFrames 软件包:
import bigframes.pandas as bpd
bpd.options.bigquery.ordering_mode = "partial"
注意:在本教程中,我们使用实验性“部分排序模式”,该模式可与类似于 Pandas 的过滤功能搭配使用,从而实现更高效的查询。某些需要严格排序或索引的 pandas 功能可能无法正常运行。
消费者投诉数据库
消费者投诉数据库通过 Google Cloud 的公共数据集计划在 BigQuery 上提供。该数据集收集了有关消费者金融产品和服务的投诉,数据由美国消费者金融保护局收集。
在 BigQuery 中,查询 bigquery-public-data.cfbp_complaints.complaint_database 表,以分析消费者投诉数据库。使用 bigframes.pandas.read_gbq()
方法根据查询字符串或表 ID 创建 DataFrame。
在新的代码单元中运行以下代码,以创建一个名为“feedback”的 DataFrame:
feedback = bpd.read_gbq(
"bigquery-public-data.cfpb_complaints.complaint_database"
)
了解 DataFrame 的基本信息
使用 DataFrame.peek()
方法下载少量数据样本。
运行此单元:
feedback.peek()
预期输出:
date_received product ... timely_response consumer_disputed complaint_id
0 2014-03-05 Bank account or service ... True False 743665
1 2014-01-21 Bank account or service ... True False 678608
2 2020-12-31 Debt collection ... True <NA> 4041190
3 2014-02-12 Debt collection ... True False 714350
4 2015-02-23 Debt collection ... True False 1251358
注意:head()
需要排序,并且通常不如 peek()
高效,除非您要可视化数据样本。
与 pandas 一样,您可以使用 DataFrame.dtypes
属性查看所有可用列及其对应的数据类型。这些数据以与 pandas 兼容的方式公开。
运行此单元:
feedback.dtypes
预期输出:
date_received date32[day][pyarrow]
product string[pyarrow]
subproduct string[pyarrow]
issue string[pyarrow]
subissue string[pyarrow]
consumer_complaint_narrative string[pyarrow]
company_public_response string[pyarrow]
company_name string[pyarrow]
state string[pyarrow]
zip_code string[pyarrow]
tags string[pyarrow]
consumer_consent_provided string[pyarrow]
submitted_via string[pyarrow]
date_sent_to_company date32[day][pyarrow]
company_response_to_consumer string[pyarrow]
timely_response boolean
consumer_disputed boolean
complaint_id string[pyarrow]
dtype: object
DataFrame.describe()
方法会从 DataFrame 中查询一些基本统计信息。由于此 DataFrame 不包含任何数值列,因此它会显示非 null 值的计数和不重复值的数量摘要。
运行此单元:
# Exclude some of the larger columns to make the query more efficient.
feedback.drop(columns=[
"consumer_complaint_narrative",
"company_public_response",
"company_response_to_consumer",
]).describe()
预期输出:
product subproduct issue subissue company_name state ... timely_response consumer_disputed complaint_id
count 3458906 3223615 3458906 2759004 3458906 3417792 ... 3458906 768399 3458906
nunique 18 76 165 221 6694 63 ... 2 2 3458906
4. 探索数据
在深入了解实际投诉之前,请对 DataFrame 使用类似 Pandas 的方法来直观呈现数据。
可视化 DataFrame
有几种内置可视化方法,例如 DataFrame.plot.hist()。由于此 DataFrame 主要包含字符串和布尔值数据,因此我们可以先进行一些汇总,以详细了解各个列。
统计收到的来自每个州的投诉数量。
complaints_by_state = (
feedback.groupby(
"state", as_index=False,
).size()
.rename(columns={"size": "total_complaints"})
.sort_values(by="total_complaints", ascending=False)
)
使用 DataFrame.to_pandas()
方法将其转换为 Pandas DataFrame。
complaints_pd = complaints_by_state.head(10).to_pandas()
对此下载的 DataFrame 使用 Pandas 可视化方法。
complaints_pd.plot.bar(x="state", y="total_complaints")
与其他数据集联接
以前,您可以查看每个州收到的投诉,但这样会丢失重要背景信息。有些州的人口比其他州多。与人口数据集(例如 美国人口调查局的美国社区调查问卷和 bigquery-public-data.geo_us_boundaries.states
表)联接。
us_states = bpd.read_gbq("bigquery-public-data.geo_us_boundaries.states")
us_survey = bpd.read_gbq("bigquery-public-data.census_bureau_acs.state_2020_5yr")
# Ensure there are leading 0s on GEOIDs for consistency across tables.
us_states = us_states.assign(
geo_id=us_states["geo_id"].str.pad(2, fillchar="0")
)
us_survey = us_survey.assign(
geo_id=us_survey["geo_id"].str.pad(2, fillchar="0")
)
“美国社区调查”按 GEOID 标识各州。与“states”表联接,按两个字母的州代码获取人口数。
pops = us_states.set_index("geo_id")[["state"]].join(
us_survey.set_index("geo_id")[["total_pop"]]
)
现在,将其与投诉数据库联接,以便比较人口与投诉数量。
complaints_and_pops = complaints_by_state.set_index("state").join(
pops.set_index("state")
)
创建散点图,比较各州的人口数量与投诉数量。
(
complaints_and_pops
.to_pandas()
.plot.scatter(x="total_pop", y="total_complaints")
)
将人口数与投诉数量进行比较时,有几个州似乎是离群值。我们将此作为一项练习留给读者来完成,即使用点标签绘制图表来识别这些点。同样,提出一些假设来解释这种情况的原因(例如受众特征不同、金融服务公司数量不同等),并对其进行测试。
5. 计算嵌入
通常,重要信息隐藏在非结构化数据(例如文本、音频或图片)中。在此示例中,投诉数据库中的大部分实用信息都包含在投诉的文本内容中。
AI 和传统技术(例如情感分析、“词袋”和 word2vec)可以从非结构化数据中提取一些定量信息。近来,“矢量嵌入”模型(与 LLM 密切相关)可以创建一系列浮点数,表示文本的语义信息。
选择数据库的一部分
与其他操作相比,运行向量嵌入模型会使用更多资源。为降低费用和避免配额问题,请在本教程的其余部分中选择部分数据。
import bigframes.pandas as bpd
bpd.options.bigquery.ordering_mode = "partial"
feedback = bpd.read_gbq(
"bigquery-public-data.cfpb_complaints.complaint_database"
)
# Note: if not using ordering_mode = "partial", you must specify these in read_gbq
# for these to affect query efficiency.
# feedback = bpd.read_gbq(
# "bigquery-public-data.cfpb_complaints.complaint_database",
# columns=["consumer_complaint_narrative"],
# filters= [
# ("consumer_complaint_narrative", "!=", ""),
# ("date_received", "==", "2022-12-01")])
feedback.shape
2022 年 12 月 1 日提交了约 1,000 条投诉,而整个数据库中总共有近 350 万行(请参阅 feedback.shape
)。
仅选择 2022-12-01 的数据,并且仅选择 consumer_complaint_narrative
列。
import datetime
feedback = feedback[
# Filter rows by passing in a boolean Series.
(feedback["date_received"] == datetime.date(2022, 12, 1))
& ~(feedback["date_received"].isnull())
& ~(feedback["consumer_complaint_narrative"].isnull())
& (feedback["consumer_complaint_narrative"] != "")
& (feedback["state"] == "CA")
# Uncomment the following if using free credits for a workshop.
# Billing accounts with free credits have limited Vertex AI quota.
# & (feedback["product"] == "Mortgage")
][
# Filter columns by passing in a list of strings.
["consumer_complaint_narrative"]
]
feedback.shape
pandas 中的 drop_duplicates
方法需要对行进行总排序,因为它会尝试选择第一个或最后一个匹配的行,并保留与其关联的索引。
而是通过调用 groupby
方法进行汇总,以删除重复的行。
feedback = (
feedback.groupby("consumer_complaint_narrative", as_index=False)
.size()
)[["consumer_complaint_narrative"]]
feedback.shape
生成嵌入
BigQuery DataFrame 通过 TextEmbeddingGenerator 类生成嵌入向量。这基于 BigQuery ML 中的 ML.GENERATE_EMBEDDING
方法,该方法会调用 Vertex AI 提供的文本嵌入模型。
from bigframes.ml.llm import TextEmbeddingGenerator
embedding_model = TextEmbeddingGenerator(
model_name="text-embedding-004"
)
feedback_embeddings = embedding_model.predict(feedback)
我们来看看嵌入的样子。这些向量表示文本嵌入模型理解的文本的语义含义。
feedback_embeddings.peek()
预期输出:
ml_generate_embedding_result \
0 [ 7.36380890e-02 2.11779331e-03 2.54309829e-...
1 [-1.10935252e-02 -5.53950183e-02 2.01338865e-...
2 [-7.85628427e-03 -5.39347418e-02 4.51385677e-...
3 [ 0.02013054 -0.0224789 -0.00164843 0.011354...
4 [-1.51684484e-03 -5.02693094e-03 1.72322839e-...
这些向量具有多个维度。我们来看看单个嵌入矢量:
feedback_embeddings["ml_generate_embedding_result"].peek().iloc[0]
嵌入生成是根据“部分成功”协定进行的。这意味着,某些行可能存在错误,并且不会生成嵌入。'ml_generate_embedding_status'
列会显示错误消息。空表示没有错误。
过滤嵌入,使其仅包含未发生错误的行。
mask = feedback_embeddings["ml_generate_embedding_status"] == ""
valid_embeddings = feedback_embeddings[mask]
valid_embeddings.shape
6. 使用文本嵌入进行分群
现在,使用 k-means 对嵌入进行分组。在本演示中,请使用任意数量的组(也称为质心)。正式版解决方案应使用Silhouette 方法等技术来调整质心点的数量。
from bigframes.ml.cluster import KMeans
num_clusters = 5
cluster_model = KMeans(n_clusters=num_clusters)
cluster_model.fit(valid_embeddings["ml_generate_embedding_result"])
clusters = cluster_model.predict(valid_embeddings)
clusters.peek()
移除所有嵌入失败。
mask = clusters["ml_generate_embedding_status"] == ""
clusters = clusters[mask]
预览并查看每个重心评论的分布情况。
clusters.groupby("CENTROID_ID").size()
7. 汇总集群
提供与每个重心相关的一些评论,并让 Gemini 总结这些投诉。提示工程是一门新兴领域,但互联网上有许多优秀的示例,例如 https://www.promptingguide.ai/。
from bigframes.ml.llm import GeminiTextGenerator
preamble = "What is the main concern in this list of user complaints:"
suffix = "Write the main issue using a formal tone."
# Now let's sample the raw comments and get the LLM to summarize them.
prompts = []
for centroid_id in range(1, num_clusters + 1):
cluster = clusters[clusters["CENTROID_ID"] == centroid_id]
comments = "\n".join(["- {0}".format(x) for x in cluster.content.peek(40)])
prompts.append("{}:\n{}\n{}".format(preamble, comments, suffix))
prompt_df = bpd.DataFrame(prompts)
gemini = GeminiTextGenerator(model_name="gemini-1.5-flash-001")
issues = gemini.predict(X=prompt_df, temperature=0.0)
issues.peek()
使用 Gemini 根据摘要撰写报告。
from IPython.display import display, Markdown
prompt = "Turn this list of issues into a short, concise report:"
for value in issues["ml_generate_text_llm_result"]:
prompt += "- {}".format(value)
prompt += "Using a formal tone, write a markdown text format report."
summary_df = bpd.DataFrame(([prompt]))
summary = gemini.predict(X=summary_df, temperature=0.0)
report = (summary["ml_generate_text_llm_result"].values[0])
display(Markdown(report))
8. 清理
如果您为本教程创建了新的 Google Cloud 项目,可以将其删除,以免系统因创建的表格或其他资源而向您收取额外费用。
9. 恭喜!
您已使用 BigQuery DataFrame 分析结构化数据和非结构化数据。在学习过程中,您探索了 Google Cloud 的公共数据集、BigQuery Studio 中的 Python 笔记本、BigQuery ML、Vertex AI,以及 BigQuery Studio 的自然语言转换为 Python 功能。真了不起!
后续步骤
- 尝试在笔记本中生成 Python 代码。BigQuery Studio 中的 Python 笔记本由 Colab Enterprise 提供支持。提示:我发现让系统帮忙生成测试数据非常有用。
- 在 GitHub 上浏览 BigQuery DataFrames 示例笔记本。
- 创建在 BigQuery Studio 中运行笔记本的安排。
- 部署使用 BigQuery DataFrames 的远程函数,以将第三方 Python 软件包与 BigQuery 集成。