1. 概览
在本实验中,您将直接提取 BigQuery 数据集,并在 Google Cloud AI Platform 上使用 TensorFlow 企业版训练欺诈检测模型。
学习内容
您将了解如何:
- 在 BigQuery 上分析数据
- 使用 TensorFlow 企业版 中的 BigQuery 连接器注入数据
- 构建深度学习模型,以使用不平衡的数据集检测欺诈行为
2. 在 BigQuery 中分析数据
您需要一个启用了结算功能的 Google Cloud Platform 项目才能运行此 Codelab。如需创建项目,请按照此处的说明操作。
第 1 步:访问 BigQuery 公共数据集
点击此链接,在 Google Cloud 控制台中访问 BigQuery 公共数据集。
在左下角的“资源树”中,您会看到一个数据集列表。浏览可用的数据集,直到找到 ml-datasets,然后选择其中的 ulb-fraud-detection 表:

点击各标签页,详细了解数据集:
- 架构标签页描述了数据类型。
- 详细信息标签页显示,这是一个不平衡的数据集,包含 284,407 笔交易,其中 492 笔是欺诈交易。
- 预览标签页会显示数据集中的记录。
第 2 步:查询表格
“详细信息”标签页会显示以下数据相关信息:
- 时间是指数据集中的第一笔交易与所选交易之间的时间间隔(以秒为单位)。
- V1-V28 是通过一种称为 PCA 的降维技术转换的列,该技术可对数据进行匿名化处理。
- 金额是交易金额。
点击查询表格即可运行查询,以便更详细地了解相关信息:

更新语句以添加 * 来查看所有列,然后点击运行。
SELECT * FROM `bigquery-public-data.ml_datasets.ulb_fraud_detection` LIMIT 1000
第 3 步:分析数据
BigQuery 提供了许多统计函数。我们来看看数据与目标变量 Class 之间的相关性。
SELECT CORR(Time,Class) as TimeCorr, CORR(V1,Class) as V1Corr, CORR(V2,Class) as V2Corr, CORR(Amount,Class) as AmountCorr FROM `bigquery-public-data.ml_datasets.ulb_fraud_detection`

相关性将提供一个范围,从 -1(负相关)到 1(正相关),0 表示独立。
请注意,V1 和 V2 与我们的目标变量略有相关性(分别为 -0.1 和 0.1 左右)。
我们发现,与时间的相关性并不高。略微负相关的结果可能表明,数据集中的欺诈性交易随着时间的推移而减少。
金额的相关性甚至更低,这表明交易金额越高,欺诈性交易的可能性就越略高。
第 4 步:计算特征缩放的平均值
对特征值进行归一化处理有助于神经网络更快地收敛。一种常见的方案是将值以 0 为中心,标准差为 1。以下查询将检索平均值。无需保存结果,因为我们稍后会提供相应的代码段。
您还会注意到,该查询包含一个有趣的 WHERE 子句。我们将在下一部分中介绍这一点,届时我们将介绍如何在训练集和测试集之间拆分数据。
SELECT
AVG(Time), AVG(V1), AVG(V2), AVG(V3), AVG(V4), AVG(V5), AVG(V6), AVG(V7), AVG(V8),
AVG(V9), AVG(V10),AVG(V11), AVG(V12), AVG(V13), AVG(V14), AVG(V15), AVG(V16),
AVG(V17), AVG(V18), AVG(V19), AVG(V20), AVG(V21), AVG(V22), AVG(V23), AVG(V24),
AVG(V25), AVG(V26), AVG(V27),AVG(V28), AVG(Amount)
FROM
`bigquery-public-data.ml_datasets.ulb_fraud_detection`
WHERE
MOD(ABS(FARM_FINGERPRINT(CONCAT(SAFE_CAST(Time AS STRING),
SAFE_CAST(Amount AS STRING)))),10) < 8
第 5 步:拆分数据
在构建机器学习模型时,通常会使用 3 个数据集:
- 训练:通过迭代调整参数来构建模型
- 验证:用于在训练过程中通过验证独立数据来评估模型是否过拟合
- 测试:在创建模型后使用,用于评估准确率
在此 Codelab 中,我们将使用 80/10/10 的训练/验证/测试拆分。
我们将把每个数据集都放入 BigQuery 中自己的表中。第一步是创建一个 BigQuery“数据集”,它是相关表的容器。选择项目后,选择创建数据集。

然后,创建一个名为 tfe_codelab 的数据集,用于包含训练、验证和测试表。

现在,我们将针对训练、测试和验证运行 3 个查询,并将数据保存在新的 tfe_codelab 数据集中。
在查询编辑器中,运行查询以生成训练数据:
SELECT *
FROM `bigquery-public-data.ml_datasets.ulb_fraud_detection`
WHERE MOD(ABS(FARM_FINGERPRINT(CONCAT(SAFE_CAST(Time AS STRING),SAFE_CAST(Amount AS STRING)))),10) < 8
查询完成后,将结果保存到 BigQuery 表中。

在您刚刚创建的 tfe_codelab 数据集中,将表命名为 ulb_fraud_detection_train 并保存数据。

WHERE 子句首先通过计算几个列的哈希值来拆分数据。然后,它会选择哈希除以 10 后余数小于 80 的行,从而获得 80% 的数据。
现在,我们针对验证集和测试集重复执行相同的流程,使用类似的查询各选择 10% 的数据。
验证
SELECT *
FROM `bigquery-public-data.ml_datasets.ulb_fraud_detection`
WHERE MOD(ABS(FARM_FINGERPRINT(CONCAT(SAFE_CAST(Time AS STRING),SAFE_CAST(Amount AS STRING)))),10) = 8
将此查询的结果保存到名为 ulb_fraud_detection_val 的表中。
测试
SELECT *
FROM `bigquery-public-data.ml_datasets.ulb_fraud_detection`
WHERE MOD(ABS(FARM_FINGERPRINT(CONCAT(SAFE_CAST(Time AS STRING),SAFE_CAST(Amount AS STRING)))),10) = 9
将此查询的结果保存到名为 ulb_fraud_detection_test 的表中。
3. 设置笔记本环境
现在,我们已经简要介绍了数据,接下来设置模型开发环境。
第 1 步:启用 API
BigQuery 连接器使用 BigQuery Storage API。在控制台中搜索 BigQuery Storage API,如果该 API 目前处于停用状态,请将其启用。

第 2 步:创建 AI Platform Notebooks 实例
前往 Cloud 控制台的 AI Platform Notebooks 部分,然后点击新建实例。然后选择最新的不带 GPU 的 TensorFlow 企业版 1.x 实例类型:
使用默认选项,然后点击创建。创建实例后,选择打开 JupyterLab:

然后,从 JupyterLab 创建 Python 3 笔记本:

4. 从 BigQuery 提取记录
第 1 步:导入 Python 软件包
在笔记本的第一个单元中,添加以下导入并运行该单元。您可以通过按顶部菜单中的向右箭头按钮或按 Command-Enter 来运行它:
import tensorflow as tf
import tensorflow.keras as keras
import tensorflow.keras.layers as layers
from tensorflow_io.bigquery import BigQueryClient
import functools
tf.enable_eager_execution()
第 2 步:定义常量
接下来,我们来定义一些将在项目中使用的常量。将 GCP_PROJECT_ID 更改为您正在使用的实际项目 ID。创建新单元格后,请立即运行。
GCP_PROJECT_ID = '<YOUR_PROJECT_ID>'
DATASET_GCP_PROJECT_ID = GCP_PROJECT_ID # A copy of the data is saved in the user project
DATASET_ID = 'tfe_codelab'
TRAIN_TABLE_ID = 'ulb_fraud_detection_train'
VAL_TABLE_ID = 'ulb_fraud_detection_val'
TEST_TABLE_ID = 'ulb_fraud_detection_test'
FEATURES = ['Time','V1','V2','V3','V4','V5','V6','V7','V8','V9','V10','V11','V12','V13','V14','V15','V16','V17','V18','V19','V20','V21','V22','V23','V24','V25','V26','V27','V28','Amount']
LABEL='Class'
DTYPES=[tf.float64] * len(FEATURES) + [tf.int64]
第 3 步:定义辅助函数
现在,我们来定义几个函数。read_session() 用于从 BigQuery 表中读取数据。extract_labels() 是一个辅助函数,用于将标签列与其余部分分开,以便数据集采用 keras.model_fit() 稍后所需的格式。
client = BigQueryClient()
def read_session(TABLE_ID):
return client.read_session(
"projects/" + GCP_PROJECT_ID, DATASET_GCP_PROJECT_ID, TABLE_ID, DATASET_ID,
FEATURES + [LABEL], DTYPES, requested_streams=2
)
def extract_labels(input_dict):
features = dict(input_dict)
label = tf.cast(features.pop(LABEL), tf.float64)
return (features, label)
第 4 步:提取数据
最后,我们来创建每个数据集,然后从训练数据集中打印第一个批次。请注意,我们将 BATCH_SIZE 定义为 32。这是一个重要参数,会影响训练速度和准确性。
BATCH_SIZE = 32
raw_train_data = read_session(TRAIN_TABLE_ID).parallel_read_rows().map(extract_labels).batch(BATCH_SIZE)
raw_val_data = read_session(VAL_TABLE_ID).parallel_read_rows().map(extract_labels).batch(BATCH_SIZE)
raw_test_data = read_session(TEST_TABLE_ID).parallel_read_rows().map(extract_labels).batch(BATCH_SIZE)
next(iter(raw_train_data)) # Print first batch
5. 构建模型
第 1 步:预处理数据
我们来为数据集中的每个特征创建特征列。在此特定数据集中,所有列均为 numeric_column 类型,但还有许多其他列类型(例如 categorical_column)。
正如我们之前讨论的那样,我们还将对数据进行归一化处理,使其以零为中心,以便网络更快地收敛。我们已预先计算了每项特征的平均值,以用于此计算。
MEANS = [94816.7387536405, 0.0011219465482001268, -0.0021445914636999603, -0.002317402958335562,
-0.002525792169927835, -0.002136576923287782, -3.7586818983702984, 8.135919975738768E-4,
-0.0015535579268265718, 0.001436137140461279, -0.0012193712736681508, -4.5364970422902533E-4,
-4.6175444671576083E-4, 9.92177789685366E-4, 0.002366229151475428, 6.710217226762278E-4,
0.0010325807119864225, 2.557260815835395E-4, -2.0804190062322664E-4, -5.057391100818653E-4,
-3.452114767842334E-6, 1.0145936326270006E-4, 3.839214074518535E-4, 2.2061197469126577E-4,
-1.5601580596677608E-4, -8.235017846415852E-4, -7.298316615408554E-4, -6.898459943652376E-5,
4.724125688297753E-5, 88.73235686453587]
def norm_data(mean, data):
data = tf.cast(data, tf.float32) * 1/(2*mean)
return tf.reshape(data, [-1, 1])
numeric_columns = []
for i, feature in enumerate(FEATURES):
num_col = tf.feature_column.numeric_column(feature, normalizer_fn=functools.partial(norm_data, MEANS[i]))
numeric_columns.append(num_col)
numeric_columns
第 2 步:构建模型
现在,我们准备好创建模型了。我们将刚刚创建的列馈送到网络中。然后,我们将编译模型。我们纳入了精确率/召回率 AUC 指标,该指标适用于不平衡的数据集。
model = keras.Sequential([
tf.keras.layers.DenseFeatures(numeric_columns),
layers.Dense(64, activation='relu'),
layers.Dense(64, activation='relu'),
layers.Dense(1, activation='sigmoid')
])
model.compile(loss='binary_crossentropy',
optimizer='adam',
metrics=['accuracy', tf.keras.metrics.AUC(curve='PR')])
第 3 步:训练模型
有多种方法可以处理不平衡的数据,包括过采样(在少数类中生成新数据)和欠采样(减少多数类中的数据)。
在本 Codelab 中,我们使用一种在错误分类少数类时会过度加权损失的技术。我们将在训练时指定 class_weight 参数,并为“1”(欺诈)设置更高的权重,因为它的普遍程度要低得多。
在本实验中,我们将使用 3 个周期(遍历数据),以便更快地完成训练。在实际应用场景中,我们希望运行足够长的时间,直到验证集的准确率不再提高为止。
CLASS_WEIGHT = {
0: 1,
1: 100
}
EPOCHS = 3
train_data = raw_train_data.shuffle(10000)
val_data = raw_val_data
test_data = raw_test_data
model.fit(train_data, validation_data=val_data, class_weight=CLASS_WEIGHT, epochs=EPOCHS)
第 4 步:评估模型
evaluate() 函数可应用于模型从未见过的测试数据,以提供客观的评估。幸运的是,我们预留了专门用于测试的数据!
model.evaluate(test_data)
第 5 步:探索
在本实验中,我们演示了如何将大型数据集从 BigQuery 直接注入到 TensorFlow Keras 模型中。我们还介绍了构建模型的所有步骤。最后,我们简要了解了如何处理不平衡的分类问题。
您可以继续尝试不同的架构和方法来处理不平衡数据集,看看能否提高准确率!
6. 清理
如果您想继续使用此笔记本电脑,建议您在不使用时将其关闭。在 Cloud 控制台的笔记本界面中,选择笔记本,然后选择停止:

如果您想删除在本实验中创建的所有资源,只需删除笔记本实例,而不是停止它。
使用 Cloud 控制台中的导航菜单,浏览到“存储空间”,然后删除您创建的用于存储模型资产的两个存储分区。
