স্পার্কের সাথে ডেটা সায়েন্স

১. সংক্ষিপ্ত বিবরণ

TheLook, একটি কাল্পনিক ই-কমার্স পোশাক বিক্রেতা প্রতিষ্ঠান, BigQuery-তে গ্রাহক, পণ্য, অর্ডার, লজিস্টিকস, ওয়েব ইভেন্ট এবং ডিজিটাল মার্কেটিং ক্যাম্পেইন সম্পর্কিত ডেটা সংরক্ষণ করে। কোম্পানিটি Apache Spark ব্যবহার করে এই ডেটা বিশ্লেষণ করার জন্য টিমের বিদ্যমান SQL এবং PySpark দক্ষতাকে কাজে লাগাতে চায়।

Spark-এর জন্য ম্যানুয়াল ইনফ্রাস্ট্রাকচার প্রোভিশনিং বা টিউনিং এড়াতে, TheLook এমন একটি অটো-স্কেলিং সলিউশন খুঁজছে যা তাদেরকে ক্লাস্টার ম্যানেজমেন্টের পরিবর্তে ওয়ার্কলোডের উপর মনোযোগ দিতে সাহায্য করবে। এছাড়াও, তারা BigQuery Studio এনভায়রনমেন্টের মধ্যেই Spark এবং BigQuery ইন্টিগ্রেট করার জন্য প্রয়োজনীয় প্রচেষ্টা কমাতে চায়।

এই ল্যাবে, আপনি পাইস্পার্ক (PySpark) ব্যবহার করে একটি লজিস্টিক রিগ্রেশন ক্লাসিফায়ার (Logistic Regression Classifier) ​​তৈরি করবেন এবং ডেটা বিশ্লেষণের জন্য বিগকোয়েরি স্টুডিও (BigQuery Studio)-এর নেটিভ নোটবুক ইন্টিগ্রেশন ও এআই-ফিচারগুলো কাজে লাগিয়ে ভবিষ্যদ্বাণী করবেন যে একজন ব্যবহারকারী কোনো কেনাকাটা করবেন কি না। আপনি এই মডেলটিকে একটি ইনফারেন্স সার্ভারে ডেপ্লয় করবেন এবং স্বাভাবিক ভাষা ব্যবহার করে মডেলটিকে কোয়েরি করার জন্য একটি এজেন্ট তৈরি করবেন।

পূর্বশর্ত

এই ল্যাবটি শুরু করার আগে আপনার নিম্নলিখিত বিষয়গুলি সম্পর্কে পরিচিত থাকা উচিত:

  • বেসিক SQL এবং পাইথন প্রোগ্রামিং।
  • জুপিটার নোটবুকে পাইথন কোড চালানো হচ্ছে।
  • ডিস্ট্রিবিউটেড কম্পিউটিং সম্পর্কে একটি প্রাথমিক ধারণা

উদ্দেশ্য

  • ডেটা সায়েন্স ওয়ার্কফ্লো চালানোর জন্য BigQuery Studio নোটবুক ব্যবহার করুন।
  • Google Cloud Serverless for Apache Spark এবং Spark Connect দ্বারা চালিত টুল ব্যবহার করে Apache Spark-এর সাথে একটি সংযোগ তৈরি করুন।
  • লাইটেনিং ইঞ্জিন ব্যবহার করে অ্যাপাচি স্পার্ক ওয়ার্কলোডের গতি ৪.৩ গুণ পর্যন্ত বৃদ্ধি করুন।
  • Apache Spark এবং BigQuery-এর মধ্যে অন্তর্নির্মিত ইন্টিগ্রেশন ব্যবহার করে BigQuery থেকে ডেটা লোড করুন।
  • জেমিনি-সহায়তায় কোড জেনারেশন ব্যবহার করে ডেটা অন্বেষণ করুন।
  • অ্যাপাচি স্পার্কের ডেটা প্রসেসিং ফ্রেমওয়ার্ক ব্যবহার করে ফিচার ইঞ্জিনিয়ারিং সম্পাদন করুন।
  • অ্যাপাচি স্পার্কের নিজস্ব মেশিন লার্নিং লাইব্রেরি, MLlib ব্যবহার করে একটি ক্লাসিফিকেশন মডেলকে প্রশিক্ষণ ও মূল্যায়ন করুন।
  • Flask এবং Cloud Run ব্যবহার করে ক্লাসিফিকেশন মডেলের জন্য একটি ইনফারেন্স সার্ভার স্থাপন করুন।
  • এজেন্ট ইঞ্জিন এবং এজেন্ট ডেভেলপমেন্ট কিট (ADK) ব্যবহার করে স্বাভাবিক ভাষায় ইনফারেন্স সার্ভারকে কোয়েরি করার জন্য একটি এজেন্ট স্থাপন করুন।

২. একটি কোলাব রানটাইম এনভায়রনমেন্টের সাথে সংযোগ করুন

একটি গুগল ক্লাউড প্রজেক্ট শনাক্ত করুন

একটি গুগল ক্লাউড প্রজেক্ট তৈরি করুন । আপনি চাইলে বিদ্যমান কোনো প্রজেক্টও ব্যবহার করতে পারেন।

নিম্নলিখিত API গুলি সক্রিয় করতে এখানে ক্লিক করুন:

  1. গুগল ক্লাউড কনসোলে, নেভিগেশন মেনু > BigQuery-তে যান।

গুগল ক্লাউড কনসোলে একটি তীরচিহ্ন BigQuery ট্যাবের দিকে নির্দেশ করছে।

  1. BigQuery Studio প্যানে, ড্রপডাউন অ্যারো বোতামে ক্লিক করুন, Notebook-এর উপর মাউস রাখুন এবং তারপর Upload নির্বাচন করুন।

11fd85757040c058.png

  1. URL রেডিও বাটনটি সিলেক্ট করুন এবং নিম্নলিখিত URL-টি ইনপুট করুন:
https://github.com/GoogleCloudPlatform/devrel-demos/blob/main/data-analytics/dataproc-webinar/data-science/Spark_Data_Science.ipynb
  1. অঞ্চলটি us-central11 এ সেট করুন এবং আপলোড-এ ক্লিক করুন।

1f2743e9f0a37b3c.png

  1. নোটবুকটি খোলার জন্য, এক্সপ্লোরার প্যানে আপনার প্রজেক্ট-আইডি নামের ড্রপডাউন অ্যারোতে ক্লিক করুন। এরপর নোটবুকস -এর ড্রপডাউনে ক্লিক করুন। Spark_Data_Science নোটবুকটিতে ক্লিক করুন।

aef016c292c8382.png

  1. আরও জায়গা পেতে BigQuery নেভিগেশন মেনু এবং নোটবুকের সূচিপত্র সংকুচিত করুন।

1c4b49de92ade1d9.png

৩. রানটাইমের সাথে সংযোগ স্থাপন করুন এবং অতিরিক্ত সেটআপ কোড চালান।

  1. কানেক্ট-এ ক্লিক করুন। পপ-আপে, আপনার ইমেল অ্যাকাউন্ট দিয়ে কোলাব এন্টারপ্রাইজকে অনুমোদন দিন। আপনার নোটবুকটি স্বয়ংক্রিয়ভাবে একটি রানটাইমের সাথে সংযুক্ত হবে।

995465ba6dbfa550.png

  1. রানটাইম প্রতিষ্ঠিত হয়ে গেলে, আপনি নিম্নলিখিত বিষয়গুলো দেখতে পাবেন:

7f917e7c54a84c91.png

  1. নোটবুকের ভেতরে, সেটআপ সেকশন পর্যন্ত স্ক্রল করুন। এখান থেকে শুরু করুন।

৪. সেটআপ কোড চালান

ল্যাবটি সম্পন্ন করার জন্য প্রয়োজনীয় পাইথন লাইব্রেরিগুলো দিয়ে আপনার পরিবেশ কনফিগার করুন। প্রাইভেট গুগল অ্যাক্সেস কনফিগার করুন । একটি স্টোরেজ বাকেট তৈরি করুন। একটি বিগকোয়েরি ডেটাসেট তৈরি করুন। আপনার প্রজেক্ট আইডি নোটবুকে কপি করুন। একটি অঞ্চল নির্বাচন করুন। এই ল্যাবের জন্য, us-central1 অঞ্চলটি ব্যবহার করুন।

সেল ব্লকের ভিতরে কার্সর রেখে তীরচিহ্নে ক্লিক করে আপনি কোড সেলটি কার্যকর করতে পারেন।

9b8ccb7d6016ebb9.png

# Enable APIs
import subprocess

command = [
    "gcloud",
    "services",
    "enable",
    "aiplatform.googleapis.com",
    "bigquery.googleapis.com",
    "bigquerystorage.googleapis.com",
    "bigqueryunified.googleapis.com",
    "cloudaicompanion.googleapis.com",
    "dataproc.googleapis.com",
    "run.googleapis.com",
    "storage.googleapis.com"
]

result = subprocess.run(command, capture_output=True, text=True)
print(result.stdout)
print(result.stderr)

# Configure a PROJECT_ID and REGION
PROJECT_ID = "<YOUR_PROJECT_ID>"
REGION = "<YOUR_REGION>"

# Enable Private Google Access
import subprocess

command = [
    "gcloud",
    "compute",
    "networks",
    "subnets",
    "update",
    "default",
    f"--region={REGION}",
    "--enable-private-ip-google-access"
]

result = subprocess.run(command, capture_output=True, text=True)
print(result.stdout)
print(result.stderr)

# Create a Cloud Storage Bucket
from google.cloud import storage
from google.cloud.exceptions import NotFound

BUCKET_NAME = f"{PROJECT_ID}-demo"

storage_client = storage.Client(project=PROJECT_ID)
try:
    bucket = storage_client.get_bucket(BUCKET_NAME)
    print(f"Bucket {BUCKET_NAME} already exists.")
except NotFound:
    bucket = storage_client.create_bucket(BUCKET_NAME, location=REGION)
    print(f"Bucket {BUCKET_NAME} created.")


# Create a BigQuery dataset.
from google.cloud import bigquery

DATASET_ID = f"{PROJECT_ID}.demo"

client = bigquery.Client()

dataset = bigquery.Dataset(DATASET_ID)

dataset.location = REGION

dataset = client.create_dataset(dataset, exists_ok=True)

৫. অ্যাপাচি স্পার্কের জন্য গুগল ক্লাউড সার্ভারলেস-এর সাথে একটি সংযোগ তৈরি করুন।

Spark Connect ব্যবহার করে, আপনি ইন্টারেক্টিভ Spark জব চালানোর জন্য একটি সার্ভারলেস Spark সেশনে সংযুক্ত হন। উন্নত Spark পারফরম্যান্সের জন্য আপনি Lightning Engine দিয়ে আপনার রানটাইম কনফিগার করেন। Lightning Engine, Apache Gluten এবং Velox ব্যবহার করে ওয়ার্কলোডকে ত্বরান্বিত করার মাধ্যমে কাজ করে।

from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session

session = Session()

session.runtime_config.version = "3.0"

# You can optionally configure Spark properties as well. See https://cloud.google.com/dataproc-serverless/docs/concepts/properties.
session.runtime_config.properties = {
  "dataproc.runtime": "premium",
  "spark.dataproc.engine": "lightningEngine",
}

# To avoid going over quota in this demo, cap the max number of Spark workers.
session.runtime_config.properties = {
    "spark.dynamicAllocation.maxExecutors": "4"
}

spark = (
    DataprocSparkSession.builder
      .appName("CustomSparkSession")
      .dataprocSessionConfig(session)
      .getOrCreate()
)

৬. জেমিনি ব্যবহার করে ডেটা লোড ও অন্বেষণ করুন।

এই অংশে, আপনি যেকোনো ডেটা সায়েন্স প্রকল্পের প্রথম গুরুত্বপূর্ণ ধাপটি অনুসরণ করবেন: আপনার ডেটা প্রস্তুত করা। আপনি BigQuery থেকে একটি Apache Spark ডেটাফ্রেমে ডেটা লোড করার মাধ্যমে শুরু করবেন।

# Load the tables
order_items = spark.read.format("bigquery").option("table", "bigquery-public-data.thelook_ecommerce.order_items").load()

users = spark.read.format("bigquery").option("table", "bigquery-public-data.thelook_ecommerce.users").load()

# Register temp tables
users.createOrReplaceTempView("users")
order_items.createOrReplaceTempView("order_items")

# Verify temp tables
spark.sql("SELECT * FROM order_items LIMIT 10").show()

তারপর, ডেটা বিশ্লেষণ করতে এবং তা আরও ভালোভাবে বুঝতে আপনি জেমিনি ব্যবহার করে পাইস্পার্ক কোড তৈরি করেন।

200d3133ea7d410b.png

নির্দেশনা ১ : PySpark ব্যবহার করে users টেবিলটি অন্বেষণ করুন এবং প্রথম ১০টি সারি দেখান।

# prompt:  Using PySpark, explore the users table and show the first 10 rows.

users.show(10)

নির্দেশ ২ : PySpark ব্যবহার করে order_items টেবিলটি অন্বেষণ করুন এবং প্রথম ১০টি সারি দেখান।

# prompt: Using PySpark, explore the order_items table and show the first 10 rows.

order_items.show(10)

নির্দেশ ৩ : PySpark ব্যবহার করে, ব্যবহারকারী টেবিলে সর্বাধিক ব্যবহৃত শীর্ষ ৫টি দেশ দেখান। প্রতিটি দেশের নাম এবং ব্যবহারকারীর সংখ্যা প্রদর্শন করুন।

# prompt: Using PySpark, show the top 5 most frequent countries in the users table. Display the country and the number of users from each country.

from pyspark.sql.functions import col, count

users.groupBy("country").agg(count("*").alias("user_count")).orderBy(col("user_count").desc()).limit(5).show()

প্রশ্ন ৪ : PySpark ব্যবহার করে order_items টেবিলের আইটেমগুলোর গড় বিক্রয় মূল্য নির্ণয় করুন।

# prompt: Using PySpark, find the average sale price of items in the order_items table.

from pyspark.sql import functions as F

average_sale_price = order_items.agg(F.avg("sale_price").alias("average_sale_price"))
average_sale_price.show()

নির্দেশ ৫ : 'users' টেবিলটি ব্যবহার করে, একটি উপযুক্ত প্লটিং লাইব্রেরির সাহায্যে দেশ বনাম ট্র্যাফিক উৎসের প্লট তৈরি করার জন্য কোড তৈরি করুন।

# prompt: Using the table "users", generate code to plot country vs traffic source using a suitable plotting library.

sql = """
    SELECT
        country,
        traffic_source
    FROM
        `bigquery-public-data.thelook_ecommerce.users`
    WHERE country IS NOT NULL AND traffic_source IS NOT NULL
"""
project_id = "iceberg-summit-2025"
df = pandas_gbq.read_gbq(sql, project_id=project_id, dialect="standard")

import matplotlib.pyplot as plt
import seaborn as sns

# Group by country and traffic_source and count occurrences
df_grouped = df.groupby(['country', 'traffic_source']).size().reset_index(name='count')

# Create a pivot table for easier plotting
pivot_table = df_grouped.pivot(index='country', columns='traffic_source', values='count').fillna(0)

# Plotting
plt.figure(figsize=(15, 8))
pivot_table.plot(kind='bar', stacked=True, figsize=(15, 8))
plt.title('Traffic Source Distribution by Country')
plt.xlabel('Country')
plt.ylabel('Number of Users')
plt.xticks(rotation=90)
plt.legend(title='Traffic Source')
plt.tight_layout()
plt.show()

নির্দেশ ৬: 'বয়স', 'দেশ', 'লিঙ্গ', 'ট্র্যাফিকের উৎস'-এর বিন্যাস দেখিয়ে একটি হিস্টোগ্রাম তৈরি করুন।

# prompt: Create a histogram showing the distribution of "age", "country", "gender", "traffic_source".

import matplotlib.pyplot as plt

# Convert Spark DataFrame to Pandas DataFrame for visualization
users_pd = users.toPandas()

# Create histograms for 'age', 'country', 'gender', 'traffic_source'
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
fig.suptitle('Distribution of User Attributes')

# Age distribution
axes[0, 0].hist(users_pd['age'].dropna(), bins=20, edgecolor='black')
axes[0, 0].set_title('Age Distribution')
axes[0, 0].set_xlabel('Age')
axes[0, 0].set_ylabel('Number of Users')

# Country distribution
users_pd['country'].value_counts().head(10).plot(kind='bar', ax=axes[0, 1])
axes[0, 1].set_title('Top 10 Countries Distribution')
axes[0, 1].set_xlabel('Country')
axes[0, 1].set_ylabel('Number of Users')
axes[0, 1].tick_params(axis='x', rotation=45)

# Gender distribution
users_pd['gender'].value_counts().plot(kind='bar', ax=axes[1, 0])
axes[1, 0].set_title('Gender Distribution')
axes[1, 0].set_xlabel('Gender')
axes[1, 0].set_ylabel('Number of Users')
axes[1, 0].tick_params(axis='x', rotation=0)

# Traffic Source distribution
users_pd['traffic_source'].value_counts().head(10).plot(kind='bar', ax=axes[1, 1])
axes[1, 1].set_title('Top 10 Traffic Source Distribution')
axes[1, 1].set_xlabel('Traffic Source')
axes[1, 1].set_ylabel('Number of Users')
axes[1, 1].tick_params(axis='x', rotation=45)

plt.tight_layout(rect=[0, 0.03, 1, 0.95])
plt.show()

৭. ডেটা প্রস্তুতি এবং বৈশিষ্ট্য প্রকৌশল

এরপরে, আপনি ডেটার উপর ফিচার ইঞ্জিনিয়ারিং করবেন। উপযুক্ত কলামগুলো নির্বাচন করুন, ডেটাকে আরও উপযোগী ডেটা টাইপে রূপান্তর করুন এবং একটি লেবেল কলাম চিহ্নিত করুন।

features = spark.sql("""
SELECT
  CAST(u.age AS DOUBLE) AS age,
  CAST(hash(u.country) AS BIGINT) * 1.0 AS country_hash,
  CAST(hash(u.gender) AS BIGINT) * 1.0 AS gender_hash,
  CAST(hash(u.traffic_source) AS BIGINT) * 1.0 AS traffic_source_hash,
  CASE WHEN COUNT(oi.id) > 0 THEN 1 ELSE 0 END AS label -- Changed label generation to count order items
FROM users AS u
LEFT JOIN order_items AS oi
ON u.id = oi.user_id
GROUP BY u.id, u.age, u.country, u.gender, u.traffic_source
""")
features.show()

৮. একটি লজিস্টিক রিগ্রেশন মডেলকে প্রশিক্ষণ দিন।

MLlib ব্যবহার করে আপনি একটি লজিস্টিক রিগ্রেশন মডেলকে ট্রেইন করেন। প্রথমে, আপনি ডেটাকে ভেক্টর ফরম্যাটে রূপান্তর করার জন্য একটি VectorAssembler ব্যবহার করেন। তারপর, আরও ভালো পারফরম্যান্সের জন্য StandardScaler ফিচার কলামকে স্কেল করে। এরপর, আপনি একটি LogisticRegression মডেলের রেফারেন্স তৈরি করেন এবং হাইপারপ্যারামিটারগুলো সংজ্ঞায়িত করেন। এই ধাপগুলোকে একটি Pipeline অবজেক্টে একত্রিত করে, fit() ফাংশন ব্যবহার করে মডেলটিকে ট্রেইন করেন এবং transform() ফাংশন ব্যবহার করে ডেটাকে ট্রান্সফর্ম করেন।

from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.functions import array_to_vector

#Split Train and Test Data (80:20)
train_data, test_data = features.randomSplit([0.8, 0.2], seed=42)

# Initialize VectorAssembler
assembler = VectorAssembler(
    inputCols=["age", "country_hash", "gender_hash", "traffic_source_hash"],
    outputCol="assembled_features"
)

# Initialize StandardScaler
scaler = StandardScaler(inputCol="assembled_features", outputCol="scaled_features")

# Initialize Logistic Regression model
lr = LogisticRegression(
    maxIter=100,
    regParam=0.2,
    threshold=0.8,
    featuresCol="scaled_features",
    labelCol="label"
)

# Define pipeline
pipeline = Pipeline(stages=[assembler, scaler, lr])

# Fit the model
pipeline_model = pipeline.fit(train_data)

# Transform the dataset using the trained model
transformed_dataset = pipeline_model.transform(test_data)
transformed_dataset.show()

৯. মডেল মূল্যায়ন

আপনার নতুন রূপান্তরিত ডেটাসেটটি মূল্যায়ন করুন। এরিয়া আন্ডার কার্ভ (AUC) নামক মূল্যায়ন মেট্রিকটি তৈরি করুন।

# Model evaluation
eva = BinaryClassificationEvaluator(metricName="areaUnderPR")
aucPR = eva.evaluate(transformed_dataset)
print(f"AUC PR: {aucPR}")

এরপর, আপনার মডেলের আউটপুট ভিজ্যুয়ালাইজ করার জন্য Gemini ব্যবহার করে PySpark কোড তৈরি করুন।

নির্দেশ ১: প্রিসিশন-রিকল (PR) কার্ভ অঙ্কন করার জন্য কোড তৈরি করুন। মডেলের পূর্বাভাস থেকে প্রিসিশন এবং রিকল গণনা করুন এবং একটি উপযুক্ত প্লটিং লাইব্রেরি ব্যবহার করে PR কার্ভটি প্রদর্শন করুন।

# prompt: Generate code to plot the Precision-Recall (PR) curve. Calculate precision and recall from the model's predictions and display the PR curve using a suitable plotting library.

import matplotlib.pyplot as plt
from sklearn.metrics import precision_recall_curve, auc

# Extract predictions and labels
predictions = transformed_dataset.select("prediction", "label").toPandas()

# Calculate precision and recall
precision, recall, _ = precision_recall_curve(predictions["label"], predictions["prediction"])

# Calculate AUC-PR
pr_auc = auc(recall, precision)

# Plot the PR curve
plt.figure(figsize=(8, 6))
plt.plot(recall, precision, color='blue', lw=2, label=f'PR curve (AUC = {pr_auc:.2f})')
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.title('Precision-Recall Curve')
plt.legend(loc='lower left')
plt.grid(True)
plt.show()

নির্দেশ ২: একটি কনফিউশন ম্যাট্রিক্স ভিজ্যুয়ালাইজেশন তৈরি করার জন্য কোড তৈরি করুন। মডেলের প্রেডিকশন থেকে কনফিউশন ম্যাট্রিক্সটি গণনা করুন এবং এটিকে একটি হিটম্যাপ বা টেবিল হিসাবে প্রদর্শন করুন, যেখানে ট্রু পজিটিভ (TP), ট্রু নেগেটিভ (TN), ফলস পজিটিভ (FP), এবং ফলস নেগেটিভ (FN)-এর সংখ্যা থাকবে।

# prompt: Generate code to create a confusion matrix visualization. Calculate the confusion matrix from the model's predictions and display it as a heat map or a table with counts of true positives (TP), true negatives (TN), false positives (FP), and false negatives (FN).

import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix

# Extract predictions and labels
predictions_and_labels = transformed_dataset.select("prediction", "label").toPandas()

# Calculate the confusion matrix
cm = confusion_matrix(predictions_and_labels["label"], predictions_and_labels["prediction"])

# Create a DataFrame for better visualization
cm_df = pd.DataFrame(cm,
                     index=['Actual Negative', 'Actual Positive'],
                     columns=['Predicted Negative', 'Predicted Positive'])

# Display the confusion matrix as a table
print("Confusion Matrix:")
print(cm_df)

# Display the confusion matrix as a heatmap
plt.figure(figsize=(8, 6))
sns.heatmap(cm_df, annot=True, fmt='d', cmap='Blues', cbar=False, linewidths=.5)
plt.title('Confusion Matrix')
plt.ylabel('Actual Label')
plt.xlabel('Predicted Label')
plt.show()

# Calculate and display TP, TN, FP, FN
TN, FP, FN, TP = cm.ravel()
print(f"
True Positives (TP): {TP}")
print(f"True Negatives (TN): {TN}")
print(f"False Positives (FP): {FP}")
print(f"False Negatives (FN): {FN}")

১০. BigQuery-তে প্রেডিকশনগুলো লিখুন

আপনার BigQuery ডেটাসেটের একটি নতুন টেবিলে আপনার প্রেডিকশনগুলো লেখার জন্য কোড তৈরি করতে Gemini ব্যবহার করুন।

নির্দেশ: Spark ব্যবহার করে রূপান্তরিত ডেটাসেটটি BigQuery-তে লিখুন।

# prompt: Using Spark, write the transformed dataset to BigQuery.

transformed_dataset.write.format("bigquery").option("table", f"{PROJECT_ID}.demo.predictions").mode("overwrite").save()

১১. মডেলটি ক্লাউড স্টোরেজে সংরক্ষণ করুন।

MLlib-এর নিজস্ব কার্যকারিতা ব্যবহার করে আপনার মডেলটি ক্লাউড স্টোরেজে সংরক্ষণ করুন। ইনফারেন্স সার্ভার এখান থেকেই মডেলটি লোড করে।

MODEL_PATH = "models/prediction_model"
pipeline_model.write().overwrite().save(f"gs://{BUCKET_NAME}/{MODEL_PATH}")

১২. একটি ইনফারেন্স সার্ভার তৈরি করুন

ক্লাউড রান হলো সার্ভারলেস ওয়েব অ্যাপ চালানোর একটি নমনীয় টুল। এটি ব্যবহারকারীদের সর্বোচ্চ কাস্টমাইজেশনের সুযোগ দিতে ডকার কন্টেইনার ব্যবহার করে। এই ল্যাবের জন্য, পাইস্পার্ক চালিত একটি ফ্লাস্ক অ্যাপ চালানোর জন্য একটি ডকারফাইল কনফিগার করা হয়েছে। এই কন্টেইনারটি ইনপুট ডেটার উপর ইনফারেন্স সম্পাদন করার জন্য ক্লাউড রানে চলে। এর কোডটি এখানে পাওয়া যাবে।

ইনফারেন্স সার্ভার কোড সহ রিপোটি ক্লোন করুন।

!git clone https://github.com/GoogleCloudPlatform/devrel-demos.git

ডকারফাইলটি দেখুন।

FROM python:3.12-slim

# Install OpenJDK-21 (Required for Spark)
RUN apt-get update && \
    apt-get install -y openjdk-21-jre-headless procps && \
    rm -rf /var/lib/apt/lists/*

ENV JAVA_HOME=/usr/lib/jvm/java-21-openjdk-amd64
ENV PORT=8080

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY main.py .

CMD ["gunicorn", "--bind", "0.0.0.0:8080", "--workers", "1", "--threads", "8", "--timeout", "0", "main:app"]

সার্ভারটির পাইথন কোড দেখুন।

import os
import json
import logging

from flask import Flask, request, jsonify
from google.cloud import storage
from pyspark.ml import PipelineModel
from pyspark.sql import SparkSession
from pyspark.sql.functions import hash, col

# Configure basic logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# --- Initialization: Spark and Model Loading ---
GCS_BUCKET = os.environ.get("GCS_BUCKET")
GCS_MODEL_PATH = os.environ.get("GCS_MODEL_PATH")
LOCAL_MODEL_PATH = "/tmp/model"

try:
    spark = SparkSession.builder \
        .appName("CloudRunSparkService") \
        .master("local[*]") \
        .getOrCreate()
    logging.info("Spark Session successfully initialized.")
except Exception as e:
    logging.error(f"Failed to initialize Spark Session: {e}")
    raise

def download_directory(bucket_name, prefix, local_path):
    """Downloads a directory from GCS to local filesystem."""
    storage_client = storage.Client()
    bucket = storage_client.get_bucket(bucket_name)
    blobs = list(bucket.list_blobs(prefix=prefix))
    
    if len(blobs) == 0:
        logging.error(f"No files found in GCS bucket {bucket_name} at prefix {prefix}")
        return
    
    for blob in blobs:
        if blob.name.endswith("/"): continue # Skip directories
        
        # Structure local paths
        relative_path = os.path.relpath(blob.name, prefix)
        local_file_path = os.path.join(local_path, relative_path)
        os.makedirs(os.path.dirname(local_file_path), exist_ok=True)
        
        blob.download_to_filename(local_file_path)
    print(f"Model downloaded to {local_path}")

# Load model
def load_model(LOCAL_MODEL_PATH, GCS_BUCKET, GCS_MODEL_PATH):
    """Download and load model on startup to avoid latency per request."""
    global MODEL
    if not os.path.exists(LOCAL_MODEL_PATH):
        download_directory(GCS_BUCKET, GCS_MODEL_PATH, LOCAL_MODEL_PATH)
    
    logging.info(f"Loading PySpark model from {GCS_MODEL_PATH}")
    # Load the Spark ML model
    try:
        MODEL = PipelineModel.load(LOCAL_MODEL_PATH)
        logging.info("Spark model loaded successfully.")
    except Exception as e:
        logging.error(f"Failed to load model: {e}")
        raise
    
# Load Model on Startup
load_model(LOCAL_MODEL_PATH, GCS_BUCKET, GCS_MODEL_PATH)

# --- Flask Application Setup ---
app = Flask(__name__)

@app.route('/predict', methods=['POST'])
def predict():
    """
    Handles incoming POST requests for inference.
    Expects JSON data that can be converted into a Spark DataFrame.
    """
    if MODEL is None:
        return jsonify({"error": "Model failed to load at startup."}), 500

    try:
        # 1. Get data from the request
        data = request.get_json()
        
        # 2. Check length of list
        data_len = len(data)
        cap = 100
        if data_len > cap:
            return jsonify({"error": f"Too many records. Count: {data_len}, Max: {cap}"}), 400

        # 2. Create Spark DataFrame
        df = spark.createDataFrame(data)
        
        # 3. Transform data
        input_df = df.select(
            col("age").cast("DOUBLE").alias("age"), 
            (hash(col("country")).cast("BIGINT") * 1.0).alias("country_hash"),
            (hash(col("gender")).cast("BIGINT") * 1.0).alias("gender_hash"),
            (hash(col("traffic_source")).cast("BIGINT") * 1.0).alias("traffic_source_hash")
        )

        # 3. Perform Inference
        predictions_df = MODEL.transform(input_df)

        # 4. Prepare results (collect and serialize)
        results = [p.prediction for p in predictions_df.select("prediction").collect()]

        # 5. Return JSON response
        return jsonify({"predictions": results})

    except Exception as e:
        logging.error(f"An error occurred during prediction: {e}")
        #return jsonify({"error": str(e)}), 500
        raise e  
    
# Gunicorn entry point uses 'app' from this file
if __name__ == '__main__':
    # Local testing only: Cloud Run uses Gunicorn/CMD command
    app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 8080)))

ইনফারেন্স সার্ভারটি স্থাপন করুন।

import subprocess

command = [
    "gcloud",
    "run",
    "deploy",
    "inference-server",
    "--source",
    "/content/devrel-demos/data-analytics/dataproc-webinar/data-science/inference-server",
    "--region",
    f"{REGION}",
    "--port",
    "8080",
    "--memory",
    "2Gi",
    "--allow-unauthenticated",
    "--set-env-vars",
    f"GCS_BUCKET={BUCKET_NAME},GCS_MODEL_PATH={MODEL_PATH}",
    "--startup-probe",
    "tcpSocket.port=8080,initialDelaySeconds=240,failureThreshold=3,timeoutSeconds=240,periodSeconds=240"
]

result = subprocess.run(command, capture_output=True, text=True)
print(result.stdout)
print(result.stderr)

আউটপুট থেকে ইনফারেন্স সার্ভার URL-টি কপি করে একটি নতুন ভেরিয়েবলে রাখুন। এটি https://inference-server-123456789.us-central1.run.app.

INFERENCE_SERVER_URL = "<YOUR_SERVER_URL>"

ইনফারেন্স সার্ভারটি পরীক্ষা করুন।

import requests

age = "25.0"
country = "United States"
traffic_source = "Search"
gender = "F"

response = requests.post(
    f"{INFERENCE_SERVER_URL}/predict",
    json=[{"age": age, "country": country, "traffic_source": traffic_source, "gender": gender}],
    headers={"Content-Type": "application/json"}
)

print(response.json())

আউটপুটটি 1.0 অথবা 0.0 হওয়া উচিত।

{'predictions': [1.0]}

১৩. একজন এজেন্ট কনফিগার করুন

ইনফারেন্স সম্পাদন করতে পারে এমন একটি এজেন্ট তৈরি করতে এজেন্ট ইঞ্জিন ব্যবহার করুন। এজেন্ট ইঞ্জিন হলো ভার্টেক্স এআই প্ল্যাটফর্মের একটি অংশ, যা এমন কিছু পরিষেবার সমষ্টি যা ডেভেলপারদের প্রোডাকশনে এআই এজেন্ট স্থাপন, পরিচালনা এবং স্কেল করতে সক্ষম করে। এতে এজেন্ট মূল্যায়ন, সেশন কনটেক্সট এবং কোড এক্সিকিউশন সহ অনেক টুল রয়েছে। এটি এজেন্ট ডেভেলপমেন্ট কিট (ADK) সহ অনেক জনপ্রিয় এজেন্টিক ফ্রেমওয়ার্ক সমর্থন করে। ADK একটি ওপেন সোর্স এজেন্টিক ফ্রেমওয়ার্ক যা জেমিনি এবং গুগল ইকোসিস্টেমের সাথে ব্যবহারের জন্য তৈরি ও অপ্টিমাইজ করা হলেও, এটি মডেল-অ্যাগনোটিক। এটি এমনভাবে ডিজাইন করা হয়েছে যাতে এজেন্ট ডেভেলপমেন্টকে সফটওয়্যার ডেভেলপমেন্টের মতো মনে হয়।

ভার্টেক্স এআই ক্লায়েন্টটি চালু করুন।

import vertexai
from vertexai import agent_engines # For the prebuilt templates

client = vertexai.Client(  # For service interactions via client.agent_engines
    project=f"{PROJECT_ID}",
    location=f"{REGION}",
)

ডেপ্লয় করা মডেলটি কোয়েরি করার জন্য একটি ফাংশন সংজ্ঞায়িত করুন।

def predict_purchase(
    age: str = "25.0",
    country: str = "United States",
    traffic_source: str = "Search",
    gender: str = "M",
):
    """Predicts whether or not a user will purchase a product.

    Args:
        age: The age of the user.
        country: The country of the user. One of: "China", "Poland", "Germany", "United States", "Spain", "United Kingdom", "España", "Japan", "Brasil", "Colombia", "Belgium", "South Korea", "Austria", "France", "Australia".
        Traffic_source: The source of the user's traffic. One of: "Display", "Email", "Search", "Organic", "Facebook".
        gender: The gender of the user. One of: "M" or "F".

    Returns:
        True if the model output is 1.0, False otherwise.
    """
    import requests
    response = requests.post(
        f"{INFERENCE_SERVER_URL}/predict",
        json=[{"age": age, "country": country, "traffic_source": traffic_source, "gender": gender}],
        headers={"Content-Type": "application/json"}
    )
    return response.json()

নমুনা প্যারামিটার প্রদান করে ফাংশনটি পরীক্ষা করুন।

predict_purchase(age=25.0, country="United States", traffic_source="Search", gender="M")

ADK ব্যবহার করে, নিচে একটি এজেন্ট সংজ্ঞায়িত করুন এবং টুল হিসেবে predict_purchase ফাংশনটি প্রদান করুন।

from google.adk.agents import Agent
from vertexai import agent_engines

agent = Agent(
   model="gemini-2.5-flash",
   name='purchase_prediction_agent',
   tools=[predict_purchase]
)

একটি কোয়েরি পাঠিয়ে এজেন্টটিকে স্থানীয়ভাবে পরীক্ষা করুন।

app = agent_engines.AdkApp(agent=agent)
async for event in app.async_stream_query(
    user_id="123",
    message="Will a 25 yo male from the United States who came from Search make a purchase? Strictly output 'yes' or 'no'.",
):
    try:
        print(event['content']['parts'][0]['text'])
    except:
      continue

মডেলটি এজেন্ট ইঞ্জিনে স্থাপন করুন।

remote_agent = client.agent_engines.create(
    agent=app,
    config={
        "requirements": ["google-cloud-aiplatform[agent_engines,adk]"],
        "staging_bucket": f"gs://{BUCKET_NAME}",
        "display_name": "purchase-predictor",
        "description": "Agent that predicts whether or not a user will purchase a product.",
    }
)

কাজটি সম্পন্ন হলে, ক্লাউড কনসোলে ডেপ্লয় করা মডেলটি দেখুন।

মডেলটি আবার কোয়েরি করুন। এটি এখন লোকাল ভার্সনের পরিবর্তে ডেপ্লয়েড এজেন্টকে নির্দেশ করছে।

async for event in remote_agent.async_stream_query(
    user_id="123",
    message="Will a 25 yo male from the United States who came from Search make a purchase? Strictly output 'yes' or 'no'.",
):
    try:
        print(event['content']['parts'][0]['text'])
    except:
      continue

১৪. পরিষ্কার করা

আপনার তৈরি করা সমস্ত গুগল ক্লাউড রিসোর্স মুছে ফেলুন। ভবিষ্যতে চার্জ এড়ানোর জন্য এই ধরনের ক্লিনআপ কমান্ড চালানো একটি অত্যন্ত গুরুত্বপূর্ণ উত্তম অভ্যাস।

# Delete the deployed agent.
remote_agent.delete(force=True)

# Delete the inference server.
import subprocess

command = [
    "gcloud",
    "run",
    "services",
    "delete",
    "inference-server",
    "--region",
    f"{REGION}",
    "--quiet"
]

subprocess.run(command, capture_output=True, text=True)

# Delete the BigQuery dataset.
bigquery_client = bigquery.Client()

bigquery_client.delete_dataset(
    f"{PROJECT_ID}.demo", delete_contents=True, not_found_ok=True
)

# Delete the Storage bucket.
storage_client = storage.Client()

bucket = storage_client.get_bucket(BUCKET_NAME)
bucket.delete_blobs(list(bucket.list_blobs()))
bucket.delete()

১৫. অভিনন্দন!

আপনি পেরেছেন! এই কোডল্যাবে, আপনি নিম্নলিখিত কাজগুলো করেছেন:

  • ডেটা সায়েন্স ওয়ার্কফ্লো চালানোর জন্য BigQuery Studio নোটবুক ব্যবহার করা হয়েছে।
  • Google Cloud Serverless for Apache Spark ব্যবহার করে এবং Spark Connect দ্বারা চালিত Apache Spark-এর সাথে একটি সংযোগ তৈরি করা হয়েছে।
  • অ্যাপাচি স্পার্ক ওয়ার্কলোডের গতি ৪.৩ গুণ পর্যন্ত বাড়াতে লাইটনিং ইঞ্জিন ব্যবহার করা হয়েছে।
  • Apache Spark এবং BigQuery-এর মধ্যকার অন্তর্নির্মিত ইন্টিগ্রেশন ব্যবহার করে BigQuery থেকে ডেটা লোড করা হয়েছে।
  • জেমিনি-সহায়তায় কোড জেনারেশনের মাধ্যমে ডেটা বিশ্লেষণ করা হয়েছে।
  • অ্যাপাচি স্পার্কের ডেটা প্রসেসিং ফ্রেমওয়ার্ক ব্যবহার করে ফিচার ইঞ্জিনিয়ারিং সম্পন্ন করা হয়েছে।
  • অ্যাপাচি স্পার্কের নিজস্ব মেশিন লার্নিং লাইব্রেরি, এমএললিব (MLlib ) ব্যবহার করে একটি ক্লাসিফিকেশন মডেলকে প্রশিক্ষণ ও মূল্যায়ন করা হয়েছে।
  • Flask এবং Cloud Run ব্যবহার করে ক্লাসিফিকেশন মডেলের জন্য একটি ইনফারেন্স সার্ভার স্থাপন করা হয়েছে।
  • এজেন্ট ইঞ্জিন এবং এজেন্ট ডেভেলপমেন্ট কিট (ADK) ব্যবহার করে স্বাভাবিক ভাষায় ইনফারেন্স সার্ভারকে কোয়েরি করার জন্য একটি এজেন্ট স্থাপন করা হয়েছে।

এরপর কী?