১. ভূমিকা
এই কোডল্যাবে, আপনি gRPC-Java ব্যবহার করে একটি ক্লায়েন্ট ও সার্ভার তৈরি করবেন, যা জাভাতে লেখা একটি রাউট-ম্যাপিং অ্যাপ্লিকেশনের ভিত্তি তৈরি করবে।
এই টিউটোরিয়ালটি শেষ করার পর, আপনি এমন একটি ক্লায়েন্ট তৈরি করতে পারবেন যা gRPC ব্যবহার করে একটি রিমোট সার্ভারের সাথে সংযোগ স্থাপন করে ক্লায়েন্টের রুটের ফিচারগুলো সম্পর্কে তথ্য সংগ্রহ করতে, ক্লায়েন্টের রুটের একটি সারাংশ তৈরি করতে এবং সার্ভার ও অন্যান্য ক্লায়েন্টদের সাথে ট্র্যাফিক আপডেটের মতো রুটের তথ্য আদান-প্রদান করতে পারবে।
সার্ভিসটি একটি প্রোটোকল বাফারস ফাইলে সংজ্ঞায়িত করা আছে, যা ক্লায়েন্ট এবং সার্ভারের জন্য বয়লারপ্লেট কোড তৈরি করতে ব্যবহৃত হবে, যাতে তারা একে অপরের সাথে যোগাযোগ করতে পারে। এর ফলে ঐ কার্যকারিতাটি বাস্তবায়নে আপনার সময় ও শ্রম বাঁচবে।
এই জেনারেট করা কোডটি শুধু সার্ভার ও ক্লায়েন্টের মধ্যকার যোগাযোগের জটিলতাই নয়, ডেটার সিরিয়ালাইজেশন এবং ডিসিরিয়ালাইজেশনও সামলে নেয়।
আপনি যা শিখবেন
- সার্ভিস এপিআই সংজ্ঞায়িত করতে প্রোটোকল বাফার কীভাবে ব্যবহার করবেন
- স্বয়ংক্রিয় কোড জেনারেশন ব্যবহার করে প্রোটোকল বাফারস ডেফিনিশন থেকে কীভাবে একটি gRPC-ভিত্তিক ক্লায়েন্ট এবং সার্ভার তৈরি করা যায়।
- gRPC ব্যবহার করে ক্লায়েন্ট-সার্ভার স্ট্রিমিং যোগাযোগ সম্পর্কে ধারণা।
এই কোডল্যাবটি সেইসব জাভা ডেভেলপারদের জন্য তৈরি করা হয়েছে যারা gRPC-তে নতুন অথবা এর বিষয়ে নিজেদের জ্ঞান ঝালিয়ে নিতে চান, কিংবা যারা ডিস্ট্রিবিউটেড সিস্টেম তৈরিতে আগ্রহী। এর জন্য gRPC-এর কোনো পূর্ব অভিজ্ঞতার প্রয়োজন নেই।
২. শুরু করার আগে
পূর্বশর্ত
- জেডিকে সংস্করণ ২৪।
কোডটি নিন
যাতে আপনাকে একেবারে গোড়া থেকে শুরু করতে না হয়, সেজন্য এই কোডল্যাবটি অ্যাপ্লিকেশনটির সোর্স কোডের একটি কাঠামো প্রদান করে, যা আপনাকে সম্পূর্ণ করতে হবে। নিম্নলিখিত ধাপগুলো আপনাকে দেখাবে কীভাবে অ্যাপ্লিকেশনটি শেষ করতে হয়, যার মধ্যে প্রোটোকল বাফার কম্পাইলার প্লাগইন ব্যবহার করে বয়লারপ্লেট gRPC কোড তৈরি করার পদ্ধতিও অন্তর্ভুক্ত রয়েছে।
প্রথমে, কোডল্যাব ওয়ার্কিং ডিরেক্টরি তৈরি করুন এবং তার ভেতরে যান:
mkdir streaming-grpc-java-getting-started && cd streaming-grpc-java-getting-started
কোডল্যাবটি ডাউনলোড এবং এক্সট্র্যাক্ট করুন:
curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
| tar xvz --strip-components=4 \
grpc-codelabs-1/codelabs/grpc-java-streaming/start_here
বিকল্পভাবে, আপনি শুধু কোডল্যাব ডিরেক্টরি সম্বলিত .zip ফাইলটি ডাউনলোড করে ম্যানুয়ালি আনজিপ করতে পারেন।
আপনি যদি ইমপ্লিমেন্টেশন টাইপ করা এড়াতে চান, তাহলে সম্পূর্ণ সোর্স কোডটি গিটহাবে পাওয়া যাবে ।
৩. বার্তা এবং পরিষেবাগুলি সংজ্ঞায়িত করুন
আপনার প্রথম পদক্ষেপ হলো প্রোটোকল বাফার ব্যবহার করে অ্যাপ্লিকেশনটির gRPC সার্ভিস, এর RPC মেথড এবং এর রিকোয়েস্ট ও রেসপন্স মেসেজ টাইপগুলো সংজ্ঞায়িত করা। আপনার সার্ভিসটি প্রদান করবে:
-
ListFeatures,RecordRoute, এবংRouteChatনামক RPC মেথডগুলো, যেগুলো সার্ভার ইমপ্লিমেন্ট করে এবং ক্লায়েন্ট কল করে। -
Point,Feature,Rectangle,RouteNoteএবংRouteSummaryহলো মেসেজ টাইপ, যেগুলো হলো ডেটা স্ট্রাকচার এবং উপরের মেথডগুলো কল করার সময় ক্লায়েন্ট ও সার্ভারের মধ্যে আদান-প্রদান করা হয়।
প্রোটোকল বাফারগুলো সাধারণত প্রোটোবাফ নামে পরিচিত। gRPC পরিভাষা সম্পর্কে আরও তথ্যের জন্য, gRPC-এর মূল ধারণা, স্থাপত্য এবং জীবনচক্র দেখুন।
এই RPC মেথড এবং এর মেসেজ টাইপগুলো প্রদত্ত সোর্স কোডের proto/routeguide/route_guide.proto ফাইলে সংজ্ঞায়িত করা থাকবে।
চলুন একটি route_guide.proto ফাইল তৈরি করি।
এই উদাহরণে যেহেতু আমরা জাভা কোড তৈরি করছি, তাই আমরা আমাদের .proto ফাইলে একটি java_package ফাইল অপশন নির্দিষ্ট করেছি:
option java_package = "io.grpc.examples.routeguide";
option java_outer_classname = "RouteGuideProto";
বার্তার প্রকারগুলি সংজ্ঞায়িত করুন
সোর্স কোডের proto/routeguide/route_guide.proto ফাইলে, প্রথমে Point মেসেজ টাইপটি সংজ্ঞায়িত করুন। একটি Point মানচিত্রে একটি অক্ষাংশ-দ্রাঘিমাংশ স্থানাঙ্ক জোড়াকে বোঝায়। এই কোডল্যাবের জন্য, স্থানাঙ্ক হিসেবে পূর্ণসংখ্যা ব্যবহার করুন:
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
1 এবং 2 সংখ্যা দুটি হলো message কাঠামোর প্রতিটি ফিল্ডের অনন্য শনাক্তকরণ নম্বর।
এরপর, Feature মেসেজ টাইপটি সংজ্ঞায়িত করুন। একটি Feature , Point দ্বারা নির্দিষ্ট কোনো অবস্থানে থাকা কোনো কিছুর নাম বা ডাক ঠিকানার জন্য একটি string ফিল্ড ব্যবহার করে।
message Feature {
// The name or address of the feature.
string name = 1;
// The point where the feature is located.
Point location = 2;
}
যাতে একটি এলাকার মধ্যে একাধিক পয়েন্ট কোনো ক্লায়েন্টের কাছে স্ট্রিম করা যায়, তার জন্য আপনার একটি Rectangle মেসেজের প্রয়োজন হবে যা একটি অক্ষাংশ-দ্রাঘিমাংশ আয়তক্ষেত্রকে প্রতিনিধিত্ব করে, যা lo এবং hi নামক দুটি তির্যকভাবে বিপরীত বিন্দু দ্বারা উপস্থাপিত হয়।
message Rectangle {
// One corner of the rectangle.
Point lo = 1;
// The other corner of the rectangle.
Point hi = 2;
}
এছাড়াও, একটি RouteNote বার্তা যা একটি নির্দিষ্ট পয়েন্টে থাকাকালীন পাঠানো বার্তার প্রতিনিধিত্ব করে:
message RouteNote {
// The location from which the message is sent.
Point location = 1;
// The message to be sent.
string message = 2;
}
অবশেষে, আপনার একটি RouteSummary মেসেজের প্রয়োজন হবে। এই মেসেজটি একটি RecordRoute RPC-এর প্রতিক্রিয়ায় পাওয়া যায়, যা পরবর্তী বিভাগে ব্যাখ্যা করা হয়েছে। এতে প্রাপ্ত স্বতন্ত্র পয়েন্টের সংখ্যা, শনাক্তকৃত ফিচারের সংখ্যা এবং প্রতিটি পয়েন্টের মধ্যবর্তী দূরত্বের ক্রমসঞ্চয়ী যোগফল হিসেবে অতিক্রান্ত মোট দূরত্ব উল্লেখ থাকে।
message RouteSummary {
// The number of points received.
int32 point_count = 1;
// The number of known features passed while traversing the route.
int32 feature_count = 2;
// The distance covered in metres.
int32 distance = 3;
// The duration of the traversal in seconds.
int32 elapsed_time = 4;
}
পরিষেবা পদ্ধতিগুলি সংজ্ঞায়িত করুন
একটি সার্ভিস সংজ্ঞায়িত করতে, আপনাকে আপনার .proto ফাইলে একটি নামযুক্ত সার্ভিস উল্লেখ করতে হয়। route_guide.proto ফাইলটিতে RouteGuide নামের একটি service স্ট্রাকচার রয়েছে, যা অ্যাপ্লিকেশনটির সার্ভিস দ্বারা প্রদত্ত এক বা একাধিক মেথড সংজ্ঞায়িত করে।
আপনার সার্ভিস ডেফিনিশনের ভেতরে যখন আপনি RPC মেথডগুলো সংজ্ঞায়িত করেন, তখন সেগুলোর রিকোয়েস্ট এবং রেসপন্স টাইপ নির্দিষ্ট করে দেন। কোডল্যাবের এই অংশে, চলুন সংজ্ঞায়িত করা যাক:
তালিকা বৈশিষ্ট্য
প্রদত্ত Rectangle মধ্যে উপলব্ধ Feature অবজেক্টগুলো সংগ্রহ করে। যেহেতু আয়তক্ষেত্রটি একটি বড় এলাকা জুড়ে থাকতে পারে এবং এতে বিপুল সংখ্যক ফিচার থাকতে পারে, তাই ফলাফলগুলো একবারে ফেরত না দিয়ে ধারাবাহিকভাবে পাঠানো হয়।
এই অ্যাপ্লিকেশনের জন্য, আপনি একটি সার্ভার-সাইড স্ট্রিমিং RPC ব্যবহার করবেন: ক্লায়েন্ট সার্ভারে একটি অনুরোধ পাঠায় এবং ক্রমানুসারে বার্তা পড়ার জন্য একটি স্ট্রিম ফেরত পায়। ক্লায়েন্ট ফেরত আসা স্ট্রিমটি থেকে ততক্ষণ পর্যন্ত পড়তে থাকে যতক্ষণ না আর কোনো বার্তা অবশিষ্ট থাকে। আমাদের উদাহরণে যেমনটি দেখতে পাচ্ছেন, রেসপন্স টাইপের আগে 'stream' কীওয়ার্ডটি বসিয়ে একটি সার্ভার-সাইড স্ট্রিমিং পদ্ধতি নির্দিষ্ট করা হয়।
rpc ListFeatures(Rectangle) returns (stream Feature) {}
রেকর্ডরুট
ভ্রমণরত কোনো রুটের উপর থাকা পয়েন্টসমূহের একটি ধারা গ্রহণ করে এবং ভ্রমণ সম্পন্ন হলে একটি RouteSummary ফেরত দেয়।
এই ক্ষেত্রে একটি ক্লায়েন্ট-সাইড স্ট্রিমিং RPC উপযুক্ত: ক্লায়েন্ট একটি প্রদত্ত স্ট্রিম ব্যবহার করে ধারাবাহিকভাবে বার্তা লেখে এবং সার্ভারে পাঠায়। ক্লায়েন্টের বার্তা লেখা শেষ হয়ে গেলে, এটি সার্ভারের সব বার্তা পড়া এবং প্রতিক্রিয়া ফেরত পাঠানোর জন্য অপেক্ষা করে। রিকোয়েস্ট টাইপের আগে 'stream' কীওয়ার্ডটি বসিয়ে একটি ক্লায়েন্ট-সাইড স্ট্রিমিং পদ্ধতি নির্দিষ্ট করা হয়।
rpc RecordRoute(stream Point) returns (RouteSummary) {}
রুটচ্যাট
একটি রুট অতিক্রম করার সময় পাঠানো RouteNotes এর একটি ধারা গ্রহণ করে, এবং একই সাথে অন্যান্য RouteNotes (যেমন অন্য ব্যবহারকারীদের কাছ থেকে) গ্রহণ করে।
দ্বিমুখী স্ট্রিমিং-এর জন্য এটি একদম সঠিক একটি ব্যবহার। একটি দ্বিমুখী স্ট্রিমিং RPC-তে উভয় পক্ষই একটি রিড-রাইট স্ট্রিম ব্যবহার করে ধারাবাহিকভাবে বার্তা পাঠায়। দুটি স্ট্রিম স্বাধীনভাবে কাজ করে, তাই ক্লায়েন্ট এবং সার্ভার তাদের পছন্দমতো যেকোনো ক্রমে পড়তে ও লিখতে পারে: উদাহরণস্বরূপ, সার্ভার তার প্রতিক্রিয়া লেখার আগে ক্লায়েন্টের সমস্ত বার্তা পাওয়ার জন্য অপেক্ষা করতে পারে, অথবা এটি পর্যায়ক্রমে একটি বার্তা পড়ে তারপর একটি বার্তা লিখতে পারে, কিংবা পড়া ও লেখার অন্য কোনো সংমিশ্রণ ব্যবহার করতে পারে। প্রতিটি স্ট্রিমে বার্তাগুলোর ক্রম সংরক্ষিত থাকে। অনুরোধ এবং প্রতিক্রিয়া উভয়ের আগে 'stream' কীওয়ার্ডটি বসিয়ে এই ধরনের পদ্ধতি নির্দিষ্ট করা হয়।
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
৪. ক্লায়েন্ট এবং সার্ভার কোড তৈরি করুন
এরপরে আমাদের .proto সার্ভিস ডেফিনিশন থেকে gRPC ক্লায়েন্ট এবং সার্ভার ইন্টারফেস তৈরি করতে হবে। আমরা একটি বিশেষ gRPC জাভা প্লাগইন সহ প্রোটোকল বাফার কম্পাইলার protoc ব্যবহার করে এটি করে থাকি। gRPC সার্ভিস তৈরি করার জন্য আপনাকে proto3 কম্পাইলার ব্যবহার করতে হবে (যা proto2 এবং proto3 উভয় সিনট্যাক্সই সমর্থন করে)।
Gradle বা Maven ব্যবহার করার সময়, protoc বিল্ড প্লাগইনটি বিল্ডের অংশ হিসেবে প্রয়োজনীয় কোড তৈরি করতে পারে। আপনার নিজের .proto ফাইলগুলো থেকে কীভাবে কোড তৈরি করবেন, তা জানতে আপনি grpc-java README দেখতে পারেন।
আমরা গ্রেডল কনফিগারেশন প্রদান করেছি।
streaming-grpc-java-getting-started ডিরেক্টরি থেকে প্রবেশ করুন
$ chmod +x gradlew $ ./gradlew generateProto
নিম্নলিখিত ক্লাসগুলি আমাদের পরিষেবা সংজ্ঞা থেকে তৈরি হয় ( build/generated/sources/proto/main/java এর অধীনে):
- প্রতিটি মেসেজ টাইপের জন্য একটি করে:
Feature.java,Rectangle.java, ...যেগুলোতে আমাদের রিকোয়েস্ট এবং রেসপন্স মেসেজ টাইপগুলোকে পপুলেট, সিরিয়ালাইজ এবং রিট্রিভ করার জন্য প্রয়োজনীয় সমস্ত প্রোটোকল বাফার কোড রয়েছে। -
RouteGuideGrpc.javaফাইলে (অন্যান্য কিছু দরকারি কোডের সাথে)RouteGuideসার্ভারগুলোর ইমপ্লিমেন্ট করার জন্য একটি বেস ক্লাস,RouteGuideGrpc.RouteGuideImplBase, রয়েছে, যেখানে ক্লায়েন্টদের ব্যবহারের জন্যRouteGuideসার্ভিস এবং স্টাব ক্লাসগুলোতে সংজ্ঞায়িত সমস্ত মেথড অন্তর্ভুক্ত আছে।
৫. পরিষেবাটি বাস্তবায়ন করুন
প্রথমে দেখা যাক আমরা কীভাবে একটি RouteGuide সার্ভার তৈরি করি। আমাদের RouteGuide সার্ভিসকে তার কাজ করানোর জন্য দুটি অংশ রয়েছে:
- আমাদের সার্ভিস ডেফিনিশন থেকে তৈরি সার্ভিস ইন্টারফেসটি ইমপ্লিমেন্ট করা: অর্থাৎ আমাদের সার্ভিসের আসল 'কাজ'টি করা।
- ক্লায়েন্টদের কাছ থেকে অনুরোধ শোনার জন্য এবং সেগুলোকে সঠিক পরিষেবা বাস্তবায়নে পাঠানোর জন্য একটি gRPC সার্ভার চালানো হচ্ছে।
রুটগাইড বাস্তবায়ন করুন
আমরা একটি RouteGuideService ক্লাস ইমপ্লিমেন্ট করব যা জেনারেট করা RouteGuideGrpc.RouteGuideImplBase ক্লাসকে এক্সটেন্ড করবে। ইমপ্লিমেন্টেশনটি দেখতে এইরকম হবে।
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
...
}
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
...
}
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
...
}
আসুন প্রতিটি RPC বাস্তবায়ন বিস্তারিতভাবে দেখি।
সার্ভার-সাইড স্ট্রিমিং RPC
এরপর চলুন আমাদের একটি স্ট্রিমিং RPC দেখি। ListFeatures হলো একটি সার্ভার-সাইড স্ট্রিমিং RPC, তাই আমাদের ক্লায়েন্টের কাছে একাধিক Features ফেরত পাঠাতে হবে।
private final Collection<Feature> features;
@Override
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());
int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());
int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());
int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());
for (Feature feature : features) {
if (!RouteGuideUtil.exists(feature)) {
continue;
}
int lat = feature.getLocation().getLatitude();
int lon = feature.getLocation().getLongitude();
if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
responseObserver.onNext(feature);
}
}
responseObserver.onCompleted();
}
সাধারণ RPC-এর মতোই, এই মেথডটি একটি রিকোয়েস্ট অবজেক্ট (সেই Rectangle যেখানে আমাদের ক্লায়েন্ট Features খুঁজতে চায়) এবং একটি StreamObserver রেসপন্স অবজারভার গ্রহণ করে।
এবার, ক্লায়েন্টকে ফেরত পাঠানোর জন্য আমাদের যতগুলো Feature অবজেক্ট প্রয়োজন, আমরা ততগুলো সংগ্রহ করি (এক্ষেত্রে, আমরা সেগুলোকে সার্ভিসের ফিচার কালেকশন থেকে বেছে নিই, আমাদের রিকোয়েস্ট Rectangle মধ্যে আছে কি না তার উপর ভিত্তি করে), এবং রেসপন্স অবজারভারের onNext() মেথড ব্যবহার করে একে একে সেগুলোকে সেখানে লিখে দিই। সবশেষে, আমাদের সাধারণ RPC-এর মতোই, রেসপন্স লেখা শেষ হয়েছে তা gRPC-কে জানানোর জন্য আমরা রেসপন্স অবজারভারের onCompleted() মেথডটি ব্যবহার করি।
ক্লায়েন্ট-সাইড স্ট্রিমিং আরপিসি
এবার চলুন আরেকটু জটিল একটি বিষয় দেখি: ক্লায়েন্ট-সাইড স্ট্রিমিং মেথড RecordRoute() , যেখানে আমরা ক্লায়েন্টের কাছ থেকে Points এর একটি স্ট্রিম গ্রহণ করি এবং তাদের ভ্রমণ সম্পর্কিত তথ্যসহ একটিমাত্র RouteSummary ফেরত দিই।
@Override
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
return new StreamObserver<Point>() {
int pointCount;
int featureCount;
int distance;
Point previous;
long startTime = System.nanoTime();
@Override
public void onNext(Point point) {
pointCount++;
if (RouteGuideUtil.exists(checkFeature(point))) {
featureCount++;
}
// For each point after the first, add the incremental distance from the previous point
// to the total distance value.
if (previous != null) {
distance += calcDistance(previous, point);
}
previous = point;
}
@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "Encountered error in recordRoute", t);
}
@Override
public void onCompleted() {
long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
.setFeatureCount(featureCount).setDistance(distance)
.setElapsedTime((int) seconds).build());
responseObserver.onCompleted();
}
};
}
যেমনটি দেখতে পাচ্ছেন, আগের মেথড টাইপগুলোর মতোই আমাদের মেথডটি একটি StreamObserver responseObserver প্যারামিটার গ্রহণ করে, কিন্তু এবার এটি ক্লায়েন্টের Points লেখার জন্য একটি StreamObserver রিটার্ন করে।
মেথড বডির ভিতরে আমরা রিটার্ন করার জন্য একটি অ্যানোনিমাস StreamObserver ইনস্ট্যানশিয়েট করি, যার মধ্যে আমরা:
- ক্লায়েন্ট যখনই মেসেজ স্ট্রিমে একটি
Pointলেখে, তখন ফিচার এবং অন্যান্য তথ্য পাওয়ার জন্যonNext()মেথডটি ওভাররাইড করুন। - আমাদের
RouteSummaryডেটা দিয়ে পূর্ণ করতে ও তৈরি করতেonCompleted()মেথডটি (যা ক্লায়েন্ট মেসেজ লেখা শেষ করলে কল করা হয়) ওভাররাইড করুন। এরপর আমরা আমাদেরRouteSummaryব্যবহার করে মেথডটির নিজস্ব রেসপন্স অবজারভারেরonNext()কল করি এবং সবশেষে সার্ভার সাইড থেকে কলটি শেষ করার জন্য এরonCompleted()মেথডটি আবার কল করি।
দ্বিমুখী স্ট্রিমিং আরপিসি
অবশেষে, চলুন আমাদের দ্বিমুখী স্ট্রিমিং RPC RouteChat() দেখি।
@Override
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
return new StreamObserver<RouteNote>() {
@Override
public void onNext(RouteNote note) {
List<RouteNote> notes = getOrCreateNotes(note.getLocation());
// Respond with all previous notes at this location.
for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
responseObserver.onNext(prevNote);
}
// Now add the new note to the list
notes.add(note);
}
@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "Encountered error in routeChat", t);
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
আমাদের ক্লায়েন্ট-সাইড স্ট্রিমিং উদাহরণের মতোই, আমরা একটি StreamObserver গ্রহণ করি এবং ফেরত দিই, তবে এবার আমরা আমাদের মেথডের রেসপন্স অবজারভারের মাধ্যমে ভ্যালুগুলো ফেরত দিই, যখন ক্লায়েন্ট তখনও তাদের মেসেজ স্ট্রিমে মেসেজ লিখতে থাকে। এখানে পড়া এবং লেখার সিনট্যাক্স আমাদের ক্লায়েন্ট-স্ট্রিমিং এবং সার্ভার-স্ট্রিমিং মেথডগুলোর মতোই হুবহু একই। যদিও উভয় পক্ষই একে অপরের মেসেজগুলো লেখার ক্রমানুসারে পাবে, ক্লায়েন্ট এবং সার্ভার উভয়ই যেকোনো ক্রমে পড়তে এবং লিখতে পারে — স্ট্রিমগুলো সম্পূর্ণ স্বাধীনভাবে কাজ করে।
সার্ভার চালু করুন
আমাদের সমস্ত মেথড ইমপ্লিমেন্ট করার পরে, আমাদের একটি gRPC সার্ভারও চালু করতে হবে যাতে ক্লায়েন্টরা আমাদের সার্ভিসটি ব্যবহার করতে পারে। নিচের কোড স্নিপেটটিতে দেখানো হয়েছে যে আমরা আমাদের RouteGuide সার্ভিসের জন্য এটি কীভাবে করি:
public RouteGuideServer(int port, URL featureFile) throws IOException {
this(ServerBuilder.forPort(port), port, RouteGuideUtil.parseFeatures(featureFile));
}
/** Create a RouteGuide server using serverBuilder as a base and features as data. */
public RouteGuideServer(ServerBuilder<?> serverBuilder, int port, Collection<Feature> features) {
this.port = port;
server = serverBuilder.addService(new RouteGuideService(features))
.build();
}
public void start() throws IOException {
server.start();
logger.info("Server started, listening on " + port);
}
যেমনটি দেখতে পাচ্ছেন, আমরা একটি ServerBuilder ব্যবহার করে আমাদের সার্ভার তৈরি ও চালু করি।
এটি করার জন্য, আমরা:
- বিল্ডারের
forPort()মেথড ব্যবহার করে ক্লায়েন্ট রিকোয়েস্ট শোনার জন্য প্রয়োজনীয় অ্যাড্রেস ও পোর্ট নির্দিষ্ট করুন। - আমাদের সার্ভিস ইমপ্লিমেন্টেশন ক্লাস
RouteGuideServiceএর একটি ইনস্ট্যান্স তৈরি করুন এবং সেটিকে বিল্ডারেরaddService()মেথডে পাস করুন। - আমাদের সার্ভিসের জন্য একটি আরপিসি সার্ভার তৈরি ও চালু করতে বিল্ডারে
build()এবংstart()কল করুন।
যেহেতু ServerBuilder-এ পোর্টটি আগে থেকেই অন্তর্ভুক্ত থাকে, তাই আমরা শুধুমাত্র লগিংয়ের জন্য পোর্ট পাস করি।
৬. ক্লায়েন্ট তৈরি করুন
এই অংশে, আমরা আমাদের RouteGuide সার্ভিসের জন্য একটি ক্লায়েন্ট তৈরি করা দেখব। আমাদের সম্পূর্ণ উদাহরণ ক্লায়েন্ট কোডটি আপনি ../complete/src/main/java/io/grpc/complete/routeguide/ RouteGuideClient.java -এ দেখতে পারেন।
একটি স্টাব ইনস্ট্যানশিয়েট করুন
সার্ভিস মেথড কল করার জন্য, আমাদের প্রথমে একটি স্টাব , বা বলা ভালো, দুটি স্টাব তৈরি করতে হবে:
- একটি ব্লকিং/সিঙ্ক্রোনাস স্টাব: এর অর্থ হলো, RPC কলটি সার্ভারের প্রতিক্রিয়ার জন্য অপেক্ষা করে এবং হয় একটি প্রতিক্রিয়া ফেরত দেয় অথবা একটি এক্সেপশন তৈরি করে।
- একটি নন-ব্লকিং/অ্যাসিঙ্ক্রোনাস স্টাব যা সার্ভারে নন-ব্লকিং কল করে, যেখানে প্রতিক্রিয়াটি অ্যাসিঙ্ক্রোনাসভাবে ফেরত আসে। আপনি শুধুমাত্র একটি অ্যাসিঙ্ক্রোনাস স্টাব ব্যবহার করে নির্দিষ্ট ধরণের স্ট্রিমিং কল করতে পারেন।
প্রথমে আমাদের স্টাবের জন্য একটি gRPC চ্যানেল তৈরি করতে হবে, যেখানে আমরা যে সার্ভার অ্যাড্রেস এবং পোর্টে সংযোগ করতে চাই তা উল্লেখ করতে হবে:
public static void main(String[] args) throws InterruptedException {
String target = "localhost:8980";
if (args.length > 0) {
if ("--help".equals(args[0])) {
System.err.println("Usage: [target]");
System.err.println("");
System.err.println(" target The server to connect to. Defaults to " + target);
System.exit(1);
}
target = args[0];
}
List<Feature> features;
try {
features = RouteGuideUtil.parseFeatures(RouteGuideUtil.getDefaultFeaturesFile());
} catch (IOException ex) {
ex.printStackTrace();
return;
}
ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create())
.build();
try {
RouteGuideClient client = new RouteGuideClient(channel);
// Looking for features between 40, -75 and 42, -73.
client.listFeatures(400000000, -750000000, 420000000, -730000000);
// Record a few randomly selected points from the features file.
client.recordRoute(features, 10);
// Send and receive some notes.
CountDownLatch finishLatch = client.routeChat();
if (!finishLatch.await(1, TimeUnit.MINUTES)) {
client.warning("routeChat did not finish within 1 minutes");
}
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
চ্যানেল তৈরি করার জন্য আমরা একটি ManagedChannelBuilder ব্যবহার করি।
এখন আমরা আমাদের .proto থেকে তৈরি করা RouteGuideGrpc ক্লাসে দেওয়া newStub এবং newBlockingStub মেথডগুলো ব্যবহার করে চ্যানেলটির মাধ্যমে আমাদের স্টাবগুলো তৈরি করতে পারি।
public RouteGuideClient(Channel channel) {
blockingStub = RouteGuideGrpc.newBlockingStub(channel);
asyncStub = RouteGuideGrpc.newStub(channel);
}
মনে রাখবেন, যদি এটি ব্লকিং না হয়, তবে এটি অ্যাসিঙ্ক।
কল পরিষেবা পদ্ধতি
এবার দেখা যাক আমরা কীভাবে আমাদের সার্ভিস মেথডগুলোকে কল করি। উল্লেখ্য যে, ব্লকিং স্টাব থেকে তৈরি করা যেকোনো RPC ব্লকিং/সিঙ্ক্রোনাস মোডে কাজ করবে, যার অর্থ হলো RPC কলটি সার্ভারের প্রতিক্রিয়ার জন্য অপেক্ষা করে এবং একটি প্রতিক্রিয়া বা একটি ত্রুটি ফেরত দেবে।
সার্ভার-সাইড স্ট্রিমিং RPC
এরপরে, আসুন ListFeatures এর একটি সার্ভার-সাইড স্ট্রিমিং কল দেখি, যা ভৌগোলিক Feature এর একটি স্ট্রিম রিটার্ন করে:
Rectangle request = Rectangle.newBuilder()
.setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
.setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();
Iterator<Feature> features;
try {
features = blockingStub.listFeatures(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return;
}
আপনি দেখতেই পাচ্ছেন, এটি Getting_Started_With_gRPC_Java কোডল্যাবে দেখা সাধারণ ইউনারি RPC-এর মতোই, তবে পার্থক্য হলো, একটিমাত্র Feature রিটার্ন করার পরিবর্তে, এই মেথডটি একটি Iterator রিটার্ন করে যা ক্লায়েন্ট রিটার্ন করা সমস্ত Features পড়ার জন্য ব্যবহার করতে পারে।
ক্লায়েন্ট-সাইড স্ট্রিমিং আরপিসি
এবার আরেকটু জটিল একটি বিষয়: ক্লায়েন্ট-সাইড স্ট্রিমিং মেথড RecordRoute , যেখানে আমরা সার্ভারে Points এর একটি স্ট্রিম পাঠাই এবং তার বিনিময়ে একটিমাত্র RouteSummary ফেরত পাই। এই মেথডটির জন্য আমাদের অ্যাসিঙ্ক্রোনাস স্টাব ব্যবহার করতে হবে। আপনি যদি ইতিমধ্যেই "সার্ভার তৈরি করা" পড়ে থাকেন, তবে এর কিছু অংশ আপনার কাছে খুব পরিচিত মনে হতে পারে — অ্যাসিঙ্ক্রোনাস স্ট্রিমিং RPC-গুলো উভয় দিকেই একই রকমভাবে প্রয়োগ করা হয়।
public void recordRoute(List<Feature> features, int numPoints) throws InterruptedException {
info("*** RecordRoute");
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {
@Override
public void onNext(RouteSummary summary) {
info("Finished trip with {0} points. Passed {1} features. "
+ "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime());
}
@Override
public void onError(Throwable t) {
Status status = Status.fromThrowable(t);
logger.log(Level.WARNING, "RecordRoute Failed: {0}", status);
finishLatch.countDown();
}
@Override
public void onCompleted() {
info("Finished RecordRoute");
finishLatch.countDown();
}
};
StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
try {
// Send numPoints points randomly selected from the features list.
Random rand = new Random();
for (int i = 0; i < numPoints; ++i) {
int index = rand.nextInt(features.size());
Point point = features.get(index).getLocation();
info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
RouteGuideUtil.getLongitude(point));
requestObserver.onNext(point);
// Sleep for a bit before sending the next one.
Thread.sleep(rand.nextInt(1000) + 500);
if (finishLatch.getCount() == 0) {
// RPC completed or errored before we finished sending.
// Sending further requests won't error, but they will just be thrown away.
return;
}
}
} catch (RuntimeException e) {
// Cancel RPC
requestObserver.onError(e);
throw e;
}
// Mark the end of requests
requestObserver.onCompleted();
// Receiving happens asynchronously
finishLatch.await(1, TimeUnit.MINUTES);
}
যেমনটি দেখতে পাচ্ছেন, এই মেথডটি কল করার জন্য আমাদের একটি StreamObserver তৈরি করতে হবে, যেটি সার্ভারের RouteSummary রেসপন্স দিয়ে কল করার জন্য একটি বিশেষ ইন্টারফেস ইমপ্লিমেন্ট করে। আমাদের StreamObserver এ আমরা যা করি তা হলো:
- সার্ভার যখন মেসেজ স্ট্রিমে একটি
RouteSummaryলেখে, তখন ফেরত আসা তথ্য প্রিন্ট করার জন্যonNext()মেথডটি ওভাররাইড করুন। -
onCompleted()মেথডটি (যা সার্ভার তার নিজের দিকের কলটি সম্পন্ন করলে চালু হয়) একটিCountDownLatchরিডিউস করার জন্য ওভাররাইড করুন, যাতে আমরা পরীক্ষা করে দেখতে পারি যে সার্ভার লেখা শেষ করেছে কিনা।
এরপর আমরা অ্যাসিঙ্ক্রোনাস স্টাবের recordRoute() মেথডে StreamObserver টি পাস করি এবং সার্ভারে পাঠানোর জন্য আমাদের Points লেখার উদ্দেশ্যে আমাদের নিজস্ব StreamObserver রিকোয়েস্ট অবজারভারটি ফেরত পাই। Points লেখা শেষ হয়ে গেলে, আমরা রিকোয়েস্ট অবজারভারের onCompleted() মেথড ব্যবহার করে gRPC-কে জানিয়ে দিই যে ক্লায়েন্ট সাইডে আমাদের লেখা শেষ হয়েছে। কাজ শেষ হয়ে গেলে, সার্ভার তার দিকে কাজটি সম্পন্ন করেছে কিনা তা দেখার জন্য আমরা আমাদের CountDownLatch চেক করি।
দ্বিমুখী স্ট্রিমিং আরপিসি
অবশেষে, চলুন আমাদের দ্বিমুখী স্ট্রিমিং RPC RouteChat() দেখি।
public CountDownLatch routeChat() {
info("*** RouteChat");
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<RouteNote> requestObserver =
asyncStub.routeChat(new StreamObserver<RouteNote>() {
@Override
public void onNext(RouteNote note) {
info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()
.getLatitude(), note.getLocation().getLongitude());
}
@Override
public void onError(Throwable t) {
warning("RouteChat Failed: {0}", Status.fromThrowable(t));
finishLatch.countDown();
}
@Override
public void onCompleted() {
info("Finished RouteChat");
finishLatch.countDown();
}
});
try {
RouteNote[] requests =
{newNote("First message", 0, 0), newNote("Second message", 0, 10_000_000),
newNote("Third message", 10_000_000, 0), newNote("Fourth message", 10_000_000, 10_000_000)};
for (RouteNote request : requests) {
info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()
.getLatitude(), request.getLocation().getLongitude());
requestObserver.onNext(request);
}
} catch (RuntimeException e) {
// Cancel RPC
requestObserver.onError(e);
throw e;
}
// Mark the end of requests
requestObserver.onCompleted();
// return the latch while receiving happens asynchronously
return finishLatch;
}
আমাদের ক্লায়েন্ট-সাইড স্ট্রিমিং উদাহরণের মতোই, আমরা একটি StreamObserver রেসপন্স অবজারভার গ্রহণ ও ফেরত দিই, তবে এবার সার্ভার যখন তার মেসেজ স্ট্রিমে বার্তা লিখতে থাকে, তখন আমরা আমাদের মেথডের রেসপন্স অবজারভারের মাধ্যমে ভ্যালু পাঠাই। এখানে পড়া এবং লেখার সিনট্যাক্স আমাদের ক্লায়েন্ট-স্ট্রিমিং মেথডের মতোই হুবহু একই। যদিও উভয় পক্ষই একে অপরের বার্তাগুলো লেখার ক্রমানুসারে পাবে, ক্লায়েন্ট এবং সার্ভার উভয়ই যেকোনো ক্রমে পড়তে ও লিখতে পারে — স্ট্রিমগুলো সম্পূর্ণ স্বাধীনভাবে কাজ করে।
৭. চেষ্টা করে দেখুন!
-
start_hereডিরেক্টরি থেকে:
$ ./gradlew installDist
এটি আপনার কোড কম্পাইল করবে, সেটিকে একটি জার ফাইলে প্যাকেজ করবে এবং উদাহরণটি চালানোর জন্য স্ক্রিপ্টগুলো তৈরি করবে। স্ক্রিপ্টগুলো build/install/start_here/bin/ ডিরেক্টরিতে তৈরি হবে। স্ক্রিপ্টগুলো হলো: route-guide-server এবং route-guide-client ।
ক্লায়েন্ট চালু করার আগে সার্ভারটি চালু থাকতে হবে।
- সার্ভারটি চালান:
$ ./build/install/start_here/bin/route-guide-server
- ক্লায়েন্টটি চালান:
$ ./build/install/start_here/bin/route-guide-client
৮. এরপর কী?
- gRPC-এর পরিচিতি এবং মূল ধারণা অংশে gRPC কীভাবে কাজ করে তা জানুন।
- বেসিক টিউটোরিয়ালটি অনুসরণ করুন
- এপিআই রেফারেন্সটি অন্বেষণ করুন।