gRPC-Rust দিয়ে শুরু করা - স্ট্রিমিং

১. ভূমিকা

এই কোডল্যাবে, আপনি gRPC-Rust ব্যবহার করে একটি ক্লায়েন্ট ও সার্ভার তৈরি করবেন, যা Rust-এ লেখা একটি রাউট-ম্যাপিং অ্যাপ্লিকেশনের ভিত্তি স্থাপন করবে।

এই টিউটোরিয়ালটি শেষ করার পর, আপনি এমন একটি ক্লায়েন্ট তৈরি করতে পারবেন যা gRPC ব্যবহার করে একটি রিমোট সার্ভারের সাথে সংযোগ স্থাপন করে ক্লায়েন্টের রুটের ফিচারগুলো সম্পর্কে তথ্য সংগ্রহ করতে, ক্লায়েন্টের রুটের একটি সারাংশ তৈরি করতে এবং সার্ভার ও অন্যান্য ক্লায়েন্টদের সাথে ট্র্যাফিক আপডেটের মতো রুটের তথ্য আদান-প্রদান করতে পারবে।

সার্ভিসটি একটি প্রোটোকল বাফারস ফাইলে সংজ্ঞায়িত করা আছে, যা ক্লায়েন্ট এবং সার্ভারের জন্য বয়লারপ্লেট কোড তৈরি করতে ব্যবহৃত হবে, যাতে তারা একে অপরের সাথে যোগাযোগ করতে পারে। এর ফলে ঐ কার্যকারিতাটি বাস্তবায়নে আপনার সময় ও শ্রম বাঁচবে।

এই জেনারেট করা কোডটি শুধু সার্ভার ও ক্লায়েন্টের মধ্যকার যোগাযোগের জটিলতাই নয়, ডেটার সিরিয়ালাইজেশন এবং ডিসিরিয়ালাইজেশনও সামলে নেয়।

আপনি যা শিখবেন

  • সার্ভিস এপিআই সংজ্ঞায়িত করতে প্রোটোকল বাফার কীভাবে ব্যবহার করবেন
  • স্বয়ংক্রিয় কোড জেনারেশন ব্যবহার করে প্রোটোকল বাফারস ডেফিনিশন থেকে কীভাবে একটি gRPC-ভিত্তিক ক্লায়েন্ট এবং সার্ভার তৈরি করা যায়।
  • gRPC ব্যবহার করে ক্লায়েন্ট-সার্ভার স্ট্রিমিং যোগাযোগ সম্পর্কে ধারণা।

এই কোডল্যাবটি সেইসব রাস্ট ডেভেলপারদের জন্য তৈরি করা হয়েছে যারা gRPC-তে নতুন অথবা এর বিষয়ে নিজেদের জ্ঞান ঝালিয়ে নিতে চান, কিংবা যারা ডিস্ট্রিবিউটেড সিস্টেম তৈরিতে আগ্রহী। gRPC-তে পূর্ব অভিজ্ঞতার কোনো প্রয়োজন নেই।

২. শুরু করার আগে

পূর্বশর্ত

নিশ্চিত করুন যে আপনি নিম্নলিখিতগুলি ইনস্টল করেছেন:

  • জিসিসি। এখানে দেওয়া নির্দেশাবলী অনুসরণ করুন।
  • রাস্ট-এর সর্বশেষ সংস্করণ। এখানে দেওয়া ইনস্টলেশন নির্দেশাবলী অনুসরণ করুন।

কোডটি নিন

যাতে আপনাকে একেবারে গোড়া থেকে শুরু করতে না হয়, সেজন্য এই কোডল্যাবটি অ্যাপ্লিকেশনটির সোর্স কোডের একটি কাঠামো প্রদান করে, যা আপনাকে সম্পূর্ণ করতে হবে। নিম্নলিখিত ধাপগুলো আপনাকে দেখাবে কীভাবে অ্যাপ্লিকেশনটি শেষ করতে হয়, যার মধ্যে প্রোটোকল বাফার কম্পাইলার প্লাগইন ব্যবহার করে বয়লারপ্লেট gRPC কোড তৈরি করার পদ্ধতিও অন্তর্ভুক্ত রয়েছে।

প্রথমে, কোডল্যাব ওয়ার্কিং ডিরেক্টরি তৈরি করুন এবং তার ভেতরে cd :

mkdir streaming-grpc-rust-getting-started && cd streaming-grpc-rust-getting-started

কোডল্যাবটি ডাউনলোড এবং এক্সট্র্যাক্ট করুন:

curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
  | tar xvz --strip-components=4 \
  grpc-codelabs-1/codelabs/grpc-rust-streaming/start_here

বিকল্পভাবে, আপনি শুধু কোডল্যাব ডিরেক্টরি সম্বলিত .zip ফাইলটি ডাউনলোড করে ম্যানুয়ালি আনজিপ করতে পারেন।

আপনি যদি ইমপ্লিমেন্টেশন টাইপ করা এড়াতে চান, তাহলে সম্পূর্ণ সোর্স কোডটি গিটহাবে পাওয়া যাবে

৩. বার্তা এবং পরিষেবাগুলি সংজ্ঞায়িত করুন

আপনার প্রথম পদক্ষেপ হলো প্রোটোকল বাফার ব্যবহার করে অ্যাপ্লিকেশনটির gRPC পরিষেবা, এর RPC পদ্ধতিসমূহ এবং এর অনুরোধ ও প্রতিক্রিয়া বার্তার ধরণগুলো সংজ্ঞায়িত করা। আপনার পরিষেবাটি প্রদান করবে:

  • ListFeatures , RecordRoute , এবং RouteChat নামক RPC মেথডগুলো, যেগুলো সার্ভার ইমপ্লিমেন্ট করে এবং ক্লায়েন্ট কল করে।
  • Point , Feature , Rectangle , RouteNote এবং RouteSummary হলো মেসেজ টাইপ, যেগুলো হলো ডেটা স্ট্রাকচার এবং উপরের মেথডগুলো কল করার সময় ক্লায়েন্ট ও সার্ভারের মধ্যে আদান-প্রদান করা হয়।

এই RPC মেথডগুলো এবং এর মেসেজ টাইপগুলো প্রদত্ত সোর্স কোডের proto/routeguide.proto ফাইলে সংজ্ঞায়িত করা থাকবে।

প্রোটোকল বাফারগুলো সাধারণত প্রোটোবাফ নামে পরিচিত। gRPC পরিভাষা সম্পর্কে আরও তথ্যের জন্য, gRPC-এর মূল ধারণা, স্থাপত্য এবং জীবনচক্র দেখুন।

বার্তার প্রকারভেদ সংজ্ঞায়িত করুন

চলুন প্রথমে আমাদের RPC-গুলোতে ব্যবহৃত মেসেজগুলো সংজ্ঞায়িত করি। সোর্স কোডের routeguide/route_guide.proto ফাইলে, প্রথমে Point মেসেজ টাইপটি সংজ্ঞায়িত করুন। একটি Point মানচিত্রে একটি অক্ষাংশ-দ্রাঘিমাংশ স্থানাঙ্ক জোড়াকে বোঝায়। এই কোডল্যাবের জন্য, স্থানাঙ্ক হিসেবে পূর্ণসংখ্যা ব্যবহার করুন:

message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

1 এবং 2 সংখ্যা দুটি হলো message কাঠামোর প্রতিটি ফিল্ডের অনন্য শনাক্তকরণ নম্বর।

এরপর, Feature মেসেজ টাইপটি সংজ্ঞায়িত করুন। একটি Feature , Point দ্বারা নির্দিষ্ট কোনো অবস্থানে থাকা কোনো কিছুর নাম বা ডাক ঠিকানার জন্য একটি string ফিল্ড ব্যবহার করে।

message Feature {
  // The name or address of the feature.
  string name = 1;

  // The point where the feature is located.
  Point location = 2;
}

এরপর একটি Rectangle বার্তা, যা একটি অক্ষাংশ-দ্রাঘিমাংশ আয়তক্ষেত্রকে বোঝায় এবং এটিকে 'lo' ও 'hi' নামক দুটি তির্যকভাবে বিপরীত বিন্দু দ্বারা প্রকাশ করা হয়।

message Rectangle {
  // One corner of the rectangle.
  Point lo = 1;

  // The other corner of the rectangle.
  Point hi = 2;
}

এছাড়াও একটি RouteNote বার্তা, যা কোনো নির্দিষ্ট সময়ে পাঠানো একটি বার্তাকে প্রতিনিধিত্ব করে।

message RouteNote {
  // The location from which the message is sent.
  Point location = 1;

  // The message to be sent.
  string message = 2;
}

আমাদের একটি RouteSummary মেসেজেরও প্রয়োজন হবে। এই মেসেজটি একটি RecordRoute RPC-এর প্রতিক্রিয়ায় পাওয়া যায়, যা পরবর্তী বিভাগে ব্যাখ্যা করা হয়েছে। এতে প্রাপ্ত স্বতন্ত্র পয়েন্টের সংখ্যা, শনাক্তকৃত ফিচারের সংখ্যা এবং প্রতিটি পয়েন্টের মধ্যবর্তী দূরত্বের ক্রমসঞ্চয়ী যোগফল হিসাবে অতিক্রান্ত মোট দূরত্ব অন্তর্ভুক্ত থাকে।

message RouteSummary {
  // The number of points received.
  int32 point_count = 1;

  // The number of known features passed while traversing the route.
  int32 feature_count = 2;

  // The distance covered in metres.
  int32 distance = 3;

  // The duration of the traversal in seconds.
  int32 elapsed_time = 4;
}

পরিষেবা পদ্ধতিগুলি সংজ্ঞায়িত করুন

চলুন প্রথমে আমাদের সার্ভিসটি সংজ্ঞায়িত করি এবং পরে মেসেজগুলো সংজ্ঞায়িত করব। একটি সার্ভিস সংজ্ঞায়িত করতে, আপনাকে আপনার .proto ফাইলে একটি নামযুক্ত সার্ভিস উল্লেখ করতে হবে। proto/routeguide.proto ফাইলটিতে RouteGuide নামের একটি service স্ট্রাকচার রয়েছে, যা অ্যাপ্লিকেশনটির সার্ভিস দ্বারা প্রদত্ত এক বা একাধিক মেথড সংজ্ঞায়িত করে।

আপনার সার্ভিস ডেফিনিশনের ভিতরে RPC মেথডগুলো সংজ্ঞায়িত করুন এবং সেগুলোর রিকোয়েস্ট ও রেসপন্স টাইপ উল্লেখ করুন। কোডল্যাবের এই অংশে, চলুন সংজ্ঞায়িত করা যাক:

তালিকা বৈশিষ্ট্য

প্রদত্ত Rectangle মধ্যে উপলব্ধ Feature সংগ্রহ করে। ফলাফলগুলি একবারে ফেরত না দিয়ে (যেমন, একটি পুনরাবৃত্ত ক্ষেত্র সহ প্রতিক্রিয়া বার্তায়) ধারাবাহিকভাবে পাঠানো হয়, কারণ আয়তক্ষেত্রটি একটি বড় এলাকা জুড়ে থাকতে পারে এবং এতে বিপুল সংখ্যক বৈশিষ্ট্য থাকতে পারে।

এই RPC-এর জন্য একটি উপযুক্ত ধরণ হলো সার্ভার-সাইড স্ট্রিমিং RPC: ক্লায়েন্ট সার্ভারে একটি অনুরোধ পাঠায় এবং ক্রমানুসারে বার্তা পড়ার জন্য একটি স্ট্রিম ফেরত পায়। ক্লায়েন্ট ফেরত আসা স্ট্রিমটি থেকে ততক্ষণ পর্যন্ত পড়তে থাকে যতক্ষণ না আর কোনো বার্তা অবশিষ্ট থাকে। আমাদের উদাহরণে যেমনটি দেখতে পাচ্ছেন, রেসপন্স টাইপের আগে ' stream কীওয়ার্ডটি বসিয়ে একটি সার্ভার-সাইড স্ট্রিমিং পদ্ধতি নির্দিষ্ট করা হয়।

rpc ListFeatures(Rectangle) returns (stream Feature) {}

রেকর্ডরুট

অতিক্রম করা হচ্ছে এমন কোনো রুটের উপর থাকা Point একটি ধারা গ্রহণ করে এবং অতিক্রমণ সম্পন্ন হলে একটি RouteSummary ফেরত দেয়।

এই ক্ষেত্রে একটি ক্লায়েন্ট-সাইড স্ট্রিমিং RPC উপযুক্ত বলে মনে হয়: ক্লায়েন্ট একটি প্রদত্ত স্ট্রিম ব্যবহার করে ধারাবাহিকভাবে বার্তা লেখে এবং সার্ভারে পাঠায়। বার্তা লেখা শেষ হয়ে গেলে, ক্লায়েন্ট সেগুলোর সব পড়ে প্রতিক্রিয়া ফেরত পাঠানোর জন্য সার্ভারের অপেক্ষা করে। রিকোয়েস্ট টাইপের আগে ' stream ' কীওয়ার্ডটি বসিয়ে একটি ক্লায়েন্ট-সাইড স্ট্রিমিং পদ্ধতি নির্দিষ্ট করা হয়।

rpc RecordRoute(stream Point) returns (RouteSummary) {}

রুটচ্যাট

একটি রুট অতিক্রম করার সময় পাঠানো RouteNote এর একটি ধারা গ্রহণ করে, এবং একই সাথে অন্যান্য RouteNote ও (যেমন অন্য ব্যবহারকারীদের কাছ থেকে) গ্রহণ করে।

দ্বিমুখী স্ট্রিমিং-এর জন্য এটি একদম সঠিক একটি ব্যবহার। একটি দ্বিমুখী স্ট্রিমিং RPC-তে উভয় পক্ষই একটি রিড-রাইট স্ট্রিম ব্যবহার করে ধারাবাহিকভাবে বার্তা পাঠায়। এই দুটি স্ট্রিম স্বাধীনভাবে কাজ করে, ফলে ক্লায়েন্ট এবং সার্ভার তাদের পছন্দমতো যেকোনো ক্রমে পড়তে ও লিখতে পারে।

উদাহরণস্বরূপ, সার্ভার তার প্রতিক্রিয়া লেখার আগে ক্লায়েন্টের সমস্ত বার্তা পাওয়ার জন্য অপেক্ষা করতে পারে, অথবা এটি পর্যায়ক্রমে একটি বার্তা পড়ে তারপর একটি বার্তা লিখতে পারে, কিংবা পড়া এবং লেখার অন্য কোনো সংমিশ্রণ ব্যবহার করতে পারে।

প্রতিটি স্ট্রিমে মেসেজগুলোর ক্রম সংরক্ষিত থাকে। রিকোয়েস্ট এবং রেসপন্স উভয়ের আগে stream কীওয়ার্ডটি বসিয়ে এই ধরনের পদ্ধতি নির্দিষ্ট করা হয়।

rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

৪. ক্লায়েন্ট এবং সার্ভার কোড তৈরি করুন

আমরা আপনাকে generated ডিরেক্টরিতে থাকা .proto ফাইল থেকে তৈরি করা কোডটি ইতিমধ্যেই দিয়ে দিয়েছি।

যদি আপনি নিজে .proto ফাইল থেকে কোড তৈরি করা শিখতে চান অথবা .proto ফাইলে কোনো পরিবর্তন করে তা পরীক্ষা করে দেখতে চান, তাহলে এই নির্দেশাবলী দেখুন।

তৈরি করা কোডটিতে রয়েছে:

  • Point , Feature , Rectangle , RouteNote এবং RouteSummary মেসেজ টাইপগুলোর জন্য স্ট্রাক্ট সংজ্ঞা।
  • আমাদের একটি সার্ভিস ট্রেইট ইমপ্লিমেন্ট করতে হবে: route_guide_server::RouteGuide
  • সার্ভারকে কল করার জন্য আমরা যে ক্লায়েন্ট টাইপটি ব্যবহার করব তা হলো: route_guide_client::RouteGuideClient<T>

এরপরে, আমরা সার্ভার-সাইডে মেথডগুলো ইমপ্লিমেন্ট করব, যাতে ক্লায়েন্ট কোনো রিকোয়েস্ট পাঠালে সার্ভার তার উত্তর দিতে পারে।

৫. পরিষেবাটি বাস্তবায়ন করুন

প্রথমে দেখা যাক আমরা কীভাবে একটি RouteGuide সার্ভার তৈরি করি। আমাদের RouteGuide সার্ভিসকে তার কাজ করানোর জন্য দুটি অংশ রয়েছে:

  • আমাদের সার্ভিস ডেফিনিশন থেকে তৈরি সার্ভিস ইন্টারফেসটি ইমপ্লিমেন্ট করা: অর্থাৎ আমাদের সার্ভিসের আসল 'কাজ'টি করা।
  • ক্লায়েন্টদের কাছ থেকে অনুরোধ শোনার জন্য এবং সেগুলোকে সঠিক মেথড ইমপ্লিমেন্টেশনে পাঠানোর জন্য একটি gRPC সার্ভার চালানো হচ্ছে।

src/server/server.rs ফাইলে, আমরা gRPC-এর include_generated_proto! ম্যাক্রো ব্যবহার করে জেনারেট করা কোডকে স্কোপের মধ্যে আনতে পারি এবং RouteGuide ট্রেইট ও Point ইম্পোর্ট করতে পারি।

mod grpc_pb {
    grpc::include_generated_proto!("generated", "routeguide");
}

pub use grpc_pb::{
    route_guide_server::{RouteGuideServer, RouteGuide},
    Point, Feature, Rectangle, RouteNote, RouteSummary
};

আমরা আমাদের সার্ভিসকে উপস্থাপন করার জন্য একটি স্ট্রাক্ট (struct) সংজ্ঞায়িত করার মাধ্যমে শুরু করতে পারি। আপাতত, আমরা এটি src/server/server.rs ফাইলে করতে পারি:

#[derive(Debug)]
pub struct RouteGuideService {
    features: Vec<Feature>,
}

এখন, আমাদের জেনারেটেড কোড থেকে route_guide_server::RouteGuide ট্রেইটটি ইমপ্লিমেন্ট করতে হবে।

রুটগাইড বাস্তবায়ন করুন

আমাদের তৈরি করা RouteGuide ইন্টারফেসটি ইমপ্লিমেন্ট করতে হবে। ইমপ্লিমেন্টেশনটি দেখতে এইরকম হবে। এটি ইতিমধ্যেই টেমপ্লেটে রয়েছে।

#[tonic::async_trait]
impl RouteGuide for RouteGuideService {
    async fn list_features(
        &self,
        request: Request<Rectangle>,
    ) -> Result<Response<ListFeaturesStream>, Status> {
        ...
    }

    async fn record_route(
        &self,
        request: Request<tonic::Streaming<Point>>,
    ) -> Result<Response<RouteSummary>, Status> {
        ...
    }

    async fn route_chat(
        &self,
        request: Request<tonic::Streaming<RouteNote>>,
    ) -> Result<Response<RouteChatStream>, Status> {
        ...
    }
}

চলুন প্রতিটি RPC বাস্তবায়ন বিস্তারিতভাবে দেখে নেওয়া যাক।

সার্ভার-সাইড স্ট্রিমিং RPC

চলুন ListFeatures দিয়ে শুরু করা যাক। এটি একটি সার্ভার-সাইড স্ট্রিমিং RPC, তাই আমাদের ক্লায়েন্টের কাছে একাধিক Feature ফেরত পাঠাতে হবে।

async fn list_features(
        &self,
        request: Request<Rectangle>,
    ) -> Result<Response<ListFeaturesStream>, Status> {
        println!("ListFeatures = {:?}", request);

        let (tx, rx) = mpsc::channel(4);
        let features = self.features.clone();

        tokio::spawn(async move {
            for feature in &features[..] {
                if in_range(&feature.location().to_owned(), request.get_ref()) {
                    println!("  => send {feature:?}");
                    tx.send(Ok(feature.clone())).await.unwrap();
                }
            }
            println!(" /// done sending");
        });

        let output_stream = ReceiverStream::new(rx);
        Ok(Response::new(Box::pin(output_stream)))
    }

যেমনটি দেখতে পাচ্ছেন, আমরা একটি রিকোয়েস্ট অবজেক্ট পাই (সেই Rectangle , যার মধ্যে আমাদের ক্লায়েন্ট Features খুঁজতে চায়)। এবার, আমাদের ভ্যালুগুলোর একটি স্ট্রিম রিটার্ন করতে হবে। আমরা একটি চ্যানেল তৈরি করি এবং একটি নতুন অ্যাসিঙ্ক্রোনাস টাস্ক স্পন করি, যেখানে আমরা একটি লুকআপ সম্পাদন করি এবং আমাদের শর্তগুলো পূরণকারী ফিচারগুলোকে চ্যানেলে পাঠিয়ে দিই। চ্যানেলের স্ট্রিম অংশটি একটি tonic::Response এর মধ্যে র‍্যাপ করে কলারের কাছে রিটার্ন করা হয়।

ক্লায়েন্ট-সাইড স্ট্রিমিং আরপিসি

এবার চলুন আরেকটু জটিল একটি বিষয় দেখা যাক: ক্লায়েন্ট-সাইড স্ট্রিমিং মেথড RecordRoute , যেখানে আমরা ক্লায়েন্টের কাছ থেকে Points এর একটি স্ট্রিম গ্রহণ করি এবং তাদের ভ্রমণ সম্পর্কিত তথ্যসহ একটিমাত্র RouteSummary ফেরত দিই। এটি ইনপুট হিসেবে একটি স্ট্রিম গ্রহণ করে, যা সার্ভার মেসেজ পড়া এবং লেখার জন্য ব্যবহার করতে পারে। এটি তার next() মেথড ব্যবহার করে ক্লায়েন্টের মেসেজগুলোর মধ্যে দিয়ে পুনরাবৃত্তি করতে পারে এবং তার একক প্রতিক্রিয়াটি ফেরত দিতে পারে।

async fn record_route(
        &self,
        request: Request<tonic::Streaming<Point>>,
    ) -> Result<Response<RouteSummary>, Status> {
        println!("RecordRoute");
        let mut stream = request.into_inner();
        let mut summary = RouteSummary::default();
        let mut last_point = None;
        let now = Instant::now();

        while let Some(point) = stream.next().await {
            let point = point?;
            println!("  ==> Point = {point:?}");

            // Increment the point count
            summary.set_point_count(summary.point_count() + 1);

            // Find features
            for feature in &self.features[..] {
                if feature.location().latitude() == point.latitude() {
                    if feature.location().longitude() == point.longitude(){
                        summary.set_feature_count(summary.feature_count() + 1);
                    }
                }
            }

            // Calculate the distance
            if let Some(ref last_point) = last_point {
                let new_dist = summary.distance() + calc_distance(last_point, &point);
                summary.set_distance(new_dist);
            }
            last_point = Some(point);
        }
        summary.set_elapsed_time(now.elapsed().as_secs() as i32);
        Ok(Response::new(summary))
    }

মেথড বডির ভেতরে, আমরা স্ট্রিমের next() মেথড ব্যবহার করে আমাদের ক্লায়েন্টের অনুরোধগুলো একটি রিকোয়েস্ট অবজেক্টে (এই ক্ষেত্রে একটি Point ) বারবার পড়তে থাকি, যতক্ষণ না আর কোনো মেসেজ বাকি থাকে। যদি এর মান None হয়, তাহলে স্ট্রিমটি তখনও সচল থাকে এবং পড়া চালিয়ে যেতে পারে।

দ্বিমুখী স্ট্রিমিং আরপিসি

অবশেষে, চলুন আমাদের দ্বিমুখী স্ট্রিমিং RPC RouteChat() দেখি।

async fn route_chat(
        &self,
        request: Request<tonic::Streaming<RouteNote>>,
    ) -> Result<Response<RouteChatStream>, Status> {
        println!("RouteChat");

        let mut notes: HashMap<(i32, i32), Vec<RouteNote>> = HashMap::new();
        let mut stream = request.into_inner();

        let output = async_stream::try_stream! {
            while let Some(note) = stream.next().await {
                let note = note?;
                let location = note.location();
                let key = (location.latitude(), location.longitude());
                let location_notes = notes.entry(key).or_insert(vec![]);
                location_notes.push(note);
                for note in location_notes {
                    yield note.clone();
                }
            }
        };
        Ok(Response::new(Box::pin(output)))
    }

এবার আমরা একটি স্ট্রিম পাই যা, আমাদের ক্লায়েন্ট-সাইড স্ট্রিমিং উদাহরণের মতোই, মেসেজ পড়া এবং লেখার জন্য ব্যবহার করা যেতে পারে। তবে, এবার আমরা আমাদের মেথডের স্ট্রিমের মাধ্যমে ভ্যালু রিটার্ন করি, যখন ক্লায়েন্ট তখনও তাদের মেসেজ স্ট্রিমে মেসেজ লিখতে থাকে। এখানে পড়া এবং লেখার সিনট্যাক্স আমাদের ক্লায়েন্ট-স্ট্রিমিং মেথডের মতোই, শুধু পার্থক্য হলো সার্ভার একটি RouteChatStream রিটার্ন করে। যদিও উভয় পক্ষই একে অপরের মেসেজগুলো লেখার ক্রমানুসারে পাবে, ক্লায়েন্ট এবং সার্ভার উভয়ই যেকোনো ক্রমে পড়তে এবং লিখতে পারে — স্ট্রিমগুলো সম্পূর্ণ স্বাধীনভাবে কাজ করে।

আমরা try_stream! ব্যবহার করে output স্ট্রিম তৈরি করি, যা নির্দেশ করে যে স্ট্রিমটি ত্রুটিও ফেরত দিতে পারে।

সার্ভার চালু করুন

এই পদ্ধতিটি প্রয়োগ করার পরে, আমাদের একটি gRPC সার্ভারও চালু করতে হবে যাতে ক্লায়েন্টরা আমাদের পরিষেবাটি ব্যবহার করতে পারে। main() ফাংশনটি পূরণ করুন।

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let addr = "[::1]:10000".parse().unwrap();
    println!("RouteGuideServer listening on: {addr}");
    let route_guide = RouteGuideService {
        features: load(),
    };
    let svc = RouteGuideServer::new(route_guide);
    Server::builder().add_service(svc).serve(addr).await?;
    Ok(())
}

main() ফাংশনে যা ঘটছে, তা ধাপে ধাপে নিচে দেওয়া হলো:

  1. ক্লায়েন্টের অনুরোধ শোনার জন্য আমরা যে পোর্টটি ব্যবহার করতে চাই তা নির্দিষ্ট করুন।
  2. ফিচার লোড করে একটি RouteGuideService তৈরি করুন
  3. আমাদের তৈরি করা সার্ভিসটি ব্যবহার করে RouteGuideServer::new() এর মাধ্যমে gRPC সার্ভারের একটি ইনস্ট্যান্স তৈরি করুন।
  4. আমাদের পরিষেবা বাস্তবায়নটি gRPC সার্ভারে নিবন্ধন করুন।
  5. প্রসেসটি কিল না হওয়া পর্যন্ত ব্লকিং ওয়েট করার জন্য, আমাদের পোর্ট ডিটেইলস সহ সার্ভারে serve() কল করুন।

৬. ক্লায়েন্ট তৈরি করুন

এই অংশে, আমরা src/client/client.rs এ থাকা আমাদের RouteGuide সার্ভিসের জন্য একটি রাস্ট ক্লায়েন্ট তৈরি করা দেখব।

প্রথমে, তৈরি করা কোডটিকে স্কোপের মধ্যে আনুন।

mod grpc_pb {
    grpc::include_generated_proto!("generated", "routeguide");
}

use grpc_pb::route_guide_client::RouteGuideClient;
use grpc_pb::{Point, Rectangle, RouteNote};

কল পরিষেবা পদ্ধতি

এবার দেখা যাক আমরা কীভাবে আমাদের সার্ভিস মেথডগুলোকে কল করি। gRPC-Rust-এ, RPC-গুলো ব্লকিং/সিঙ্ক্রোনাস মোডে কাজ করে, যার মানে হলো RPC কলটি সার্ভারের প্রতিক্রিয়ার জন্য অপেক্ষা করে এবং একটি প্রতিক্রিয়া বা একটি ত্রুটি ফেরত দেয়।

সার্ভার-সাইড স্ট্রিমিং RPC

এখানেই আমরা সার্ভার-সাইড স্ট্রিমিং মেথড ListFeatures কল করি, যা ভৌগোলিক Feature অবজেক্টের একটি স্ট্রিম রিটার্ন করে।

async fn print_features(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
    let rectangle = proto!(Rectangle {
        lo: proto!(Point {
            latitude: 400_000_000,
            longitude: -750_000_000,
        }),
        hi: proto!(Point {
            latitude: 420_000_000,
            longitude: -730_000_000,
        }),
    });

    let mut stream = client
        .list_features(Request::new(rectangle))
        .await?
        .into_inner();

    while let Some(feature) = stream.message().await? {
        println!("FEATURE: Name = \"{}\", Lat = {}, Lon = {}",
            feature.name(),
            feature.location().latitude(),
            feature.location().longitude());
        }
    Ok(())
}

আমরা মেথডটিতে একটি রিকোয়েস্ট পাঠাই এবং তার বিনিময়ে ListFeaturesStream এর একটি ইনস্ট্যান্স ফেরত পাই। ক্লায়েন্ট সার্ভারের রেসপন্সগুলো পড়ার জন্য ListFeaturesStream স্ট্রিমটি ব্যবহার করতে পারে। আমরা ListFeaturesStream এর message() মেথডটি ব্যবহার করে সার্ভারের রেসপন্সগুলো একটি রেসপন্স প্রোটোকল বাফার অবজেক্টে (এই ক্ষেত্রে একটি Feature ) বারবার পড়তে থাকি, যতক্ষণ না আর কোনো মেসেজ অবশিষ্ট থাকে।

ক্লায়েন্ট-সাইড স্ট্রিমিং আরপিসি

এখানে record_route এর জন্য, আমরা পয়েন্টের একটি ভেক্টরকে একটি স্ট্রিমে পরিণত করি। এরপর আমরা এই স্ট্রিমটিকে একটি রিকোয়েস্ট হিসেবে record_route() এ পাঠাই এবং সার্ভার দ্বারা স্ট্রিমটি সম্পূর্ণরূপে প্রসেস হওয়ার পর একটিমাত্র RouteSummary রেসপন্স পাই।

async fn run_record_route(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
    let mut rng = rand::rng();
    let point_count: i32 = rng.random_range(2..100);

    let mut points = vec![];
    for _ in 0..=point_count {
        points.push(random_point(&mut rng))
    }

    println!("Traversing {} points", points.len());
    let request = Request::new(tokio_stream::iter(points));

    match client.record_route(request).await {
        Ok(response) => {
            let response = response.into_inner();
            println!("SUMMARY: Feature Count = {}, Distance = {}", response.feature_count(), response.distance())},
        Err(e) => println!("something went wrong: {e:?}"),
    }

    Ok(())
}

দ্বিমুখী স্ট্রিমিং আরপিসি

অবশেষে, চলুন আমাদের দ্বিমুখী স্ট্রিমিং RPC RouteChat() মেথডটি দেখি। আমরা এই মেথডে একটি স্ট্রিম রিকোয়েস্ট পাঠাই, যাতে আমরা লিখি এবং বিনিময়ে একটি স্ট্রিম পাই যেখান থেকে আমরা মেসেজ পড়তে পারি। এবার আমরা আমাদের মেথডের স্ট্রিমের মাধ্যমে ভ্যালুগুলো ফেরত পাঠাই, যখন সার্ভার তখনও তাদের মেসেজ স্ট্রিমে মেসেজ লিখতে থাকে।

async fn run_route_chat(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
    let start = time::Instant::now();
    let outbound = async_stream::stream! {
        let mut interval = time::interval(Duration::from_secs(1));
        for _ in 0..10 {
            let time = interval.tick().await;
            let elapsed = time.duration_since(start);
            let note = proto!(RouteNote {
                location: proto!(Point {
                    latitude: 409146138 + elapsed.as_secs() as i32,
                    longitude: -746188906,
                }),
                message: format!("at {elapsed:?}"),
            });
            yield note;
        }
    };
    let response = client.route_chat(Request::new(outbound)).await?;
    let mut inbound = response.into_inner();
    while let Some(note) = inbound.message().await? {
        println!("Note: Latitude = {}, Longitude = {}, Message = \"{}\"",
            note.location().latitude(),
            note.location().longitude(),
            note.message());
        }
    Ok(())
}

যদিও উভয় পক্ষই একে অপরের বার্তাগুলো লেখার ক্রমানুসারে পাবে, ক্লায়েন্ট এবং সার্ভার উভয়ই যেকোনো ক্রমে পড়তে ও লিখতে পারে — স্ট্রিমগুলো সম্পূর্ণ স্বাধীনভাবে কাজ করে।

কল হেল্পার পদ্ধতি

সার্ভিস মেথড কল করার জন্য, আমাদের প্রথমে সার্ভারের সাথে যোগাযোগের জন্য একটি চ্যানেল তৈরি করতে হবে। এটি তৈরি করার জন্য, প্রথমে একটি এন্ডপয়েন্ট তৈরি করতে হবে, সেই এন্ডপয়েন্টে কানেক্ট করতে হবে এবং কানেক্ট করার সময় তৈরি হওয়া চ্যানেলটি RouteGuideClient::new() ফাংশনে নিম্নোক্তভাবে পাস করতে হবে:

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create endpoint to connect to
    let endpoint = Endpoint::new("http://[::1]:10000")?; 
    let channel = endpoint.connect().await?;             

    // Create a new client
    let mut client = RouteGuideClient::new(channel); 
    Ok(())
}

main() ফাংশনে, আমরা এইমাত্র তৈরি করা মেথডগুলো এক্সিকিউট করুন।

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create endpoint to connect to
    let endpoint = Endpoint::new("http://[::1]:10000")?; 
    let channel = endpoint.connect().await?;             

    // Create a new client
    let mut client = RouteGuideClient::new(channel);

    println!("\n*** SERVER STREAMING ***");
    print_features(&mut client).await?;

    println!("\n*** CLIENT STREAMING ***");
    run_record_route(&mut client).await?;

    println!("\n*** BIDIRECTIONAL STREAMING ***");
    run_route_chat(&mut client).await?;

    Ok(())
}

৭. এটি পরীক্ষা করে দেখুন

প্রথমে, আমাদের ক্লায়েন্ট এবং সার্ভার চালানোর জন্য, সেগুলোকে আমাদের ক্রেটে বাইনারি টার্গেট হিসেবে যুক্ত করতে হবে। সেই অনুযায়ী আমাদের Cargo.toml ফাইলটি সম্পাদনা করতে হবে:

[[bin]]
name = "routeguide-server"
path = "src/server/server.rs"

[[bin]]
name = "routeguide-client"
path = "src/client/client.rs"

যেকোনো প্রজেক্টের মতোই, আমাদের কোড ঠিকমতো কাজ করার জন্য প্রয়োজনীয় ডিপেন্ডেন্সিগুলো নিয়েও ভাবতে হবে। রাস্ট প্রজেক্টের ক্ষেত্রে, ডিপেন্ডেন্সিগুলো Cargo.toml ফাইলে থাকবে। আমরা ইতিমধ্যেই Cargo.toml ফাইলে প্রয়োজনীয় ডিপেন্ডেন্সিগুলোর তালিকা করে দিয়েছি।

তারপর, আমাদের ওয়ার্কিং ডিরেক্টরিগুলো থেকে নিম্নলিখিত কমান্ডগুলো চালান:

  1. একটি টার্মিনালে সার্ভারটি চালান:
RUSTFLAGS="-Awarnings" cargo run --bin routeguide-server 
  1. অন্য একটি টার্মিনাল থেকে ক্লায়েন্টটি চালান:
RUSTFLAGS="-Awarnings" cargo run --bin routeguide-client

আপনি এইরকম আউটপুট দেখতে পাবেন:

*** SERVER STREAMING ***
FEATURE: Name = "Patriots Path, Mendham, NJ 07945, USA", Lat = 407838351, Lon = -746143763
FEATURE: Name = "101 New Jersey 10, Whippany, NJ 07981, USA", Lat = 408122808, Lon = -743999179
FEATURE: Name = "U.S. 6, Shohola, PA 18458, USA", Lat = 413628156, Lon = -749015468
...
*** CLIENT STREAMING ***
Traversing 86 points
SUMMARY: Feature Count = 0, Distance = 803709356

*** BIDIRECTIONAL STREAMING ***
Note: Latitude = 409146138, Longitude = -746188906, Message = "at 112.45µs"
Note: Latitude = 409146139, Longitude = -746188906, Message = "at 1.00011245s"
Note: Latitude = 409146140, Longitude = -746188906, Message = "at 2.00011245s"

৮. এরপর কী?