۱. مقدمه
در این آزمایشگاه کد، شما از gRPC-Java برای ایجاد یک کلاینت و سرور استفاده خواهید کرد که پایه و اساس یک برنامه مسیریابی نوشته شده در جاوا را تشکیل میدهند.
در پایان آموزش، شما یک کلاینت خواهید داشت که با استفاده از gRPC به یک سرور راه دور متصل میشود تا اطلاعاتی در مورد ویژگیهای مسیر کلاینت دریافت کند، خلاصهای از مسیر کلاینت ایجاد کند و اطلاعات مسیر مانند بهروزرسانیهای ترافیک را با سرور و سایر کلاینتها تبادل کند.
این سرویس در یک فایل Protocol Buffers تعریف شده است که برای تولید کد تکراری برای کلاینت و سرور استفاده میشود تا بتوانند با یکدیگر ارتباط برقرار کنند و در زمان و تلاش شما برای پیادهسازی آن قابلیت صرفهجویی شود.
این کد تولید شده نه تنها پیچیدگیهای ارتباط بین سرور و کلاینت، بلکه سریالسازی و از سریالزدایی دادهها را نیز برطرف میکند.
آنچه یاد خواهید گرفت
- نحوه استفاده از بافرهای پروتکل برای تعریف یک API سرویس.
- نحوه ساخت یک کلاینت و سرور مبتنی بر gRPC از تعریف Protocol Buffers با استفاده از تولید خودکار کد.
- آشنایی با ارتباطات استریمینگ کلاینت-سرور با gRPC
این آزمایشگاه کد برای توسعهدهندگان جاوا که تازه با gRPC آشنا شدهاند یا به دنبال مرور gRPC هستند، یا هر کسی که به ساخت سیستمهای توزیعشده علاقهمند است، مناسب است. هیچ تجربه قبلی gRPC لازم نیست.
۲. قبل از شروع
پیشنیازها
- JDK نسخه ۲۴
کد را دریافت کنید
برای اینکه مجبور نباشید کاملاً از ابتدا شروع کنید، این codelab چارچوبی از کد منبع برنامه را برای تکمیل شما فراهم میکند. مراحل زیر نحوه تکمیل برنامه، از جمله استفاده از افزونههای کامپایلر بافر پروتکل برای تولید کد gRPC قالببندی شده را به شما نشان میدهد.
ابتدا، دایرکتوری کاری codelab را ایجاد کنید و با دستور cd به آن وارد شوید:
mkdir streaming-grpc-java-getting-started && cd streaming-grpc-java-getting-started
کدلب را دانلود و استخراج کنید:
curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
| tar xvz --strip-components=4 \
grpc-codelabs-1/codelabs/grpc-java-streaming/start_here
روش دیگر این است که فایل .zip که فقط شامل دایرکتوری codelab است را دانلود کرده و به صورت دستی آن را از حالت فشرده خارج کنید.
اگر میخواهید از تایپ کردن پیادهسازی صرفنظر کنید، کد منبع تکمیلشده در گیتهاب موجود است.
۳. تعریف پیامها و خدمات
اولین قدم شما تعریف سرویس gRPC برنامه، متد RPC آن و انواع پیامهای درخواست و پاسخ آن با استفاده از Protocol Buffers است. سرویس شما موارد زیر را ارائه خواهد داد:
- متدهای RPC به نامهای
ListFeatures،RecordRouteوRouteChatکه سرور پیادهسازی میکند و کلاینت آنها را فراخوانی میکند. - انواع پیام
Point،Feature،Rectangle،RouteNoteوRouteSummaryهستند که ساختارهای دادهای هستند که هنگام فراخوانی متدهای بالا بین کلاینت و سرور رد و بدل میشوند.
بافرهای پروتکل معمولاً به عنوان protobufs شناخته میشوند. برای اطلاعات بیشتر در مورد اصطلاحات gRPC، به مفاهیم اصلی، معماری و چرخه حیات gRPC مراجعه کنید.
این متد RPC و انواع پیامهای آن، همگی در فایل proto/routeguide/route_guide.proto از کد منبع ارائه شده تعریف خواهند شد.
بیایید یک فایل route_guide.proto ایجاد کنیم.
از آنجایی که در این مثال کد جاوا تولید میکنیم، گزینه فایل java_package را در .proto خود مشخص کردهایم:
option java_package = "io.grpc.examples.routeguide";
option java_outer_classname = "RouteGuideProto";
تعریف انواع پیام
در فایل proto/routeguide/route_guide.proto از کد منبع، ابتدا نوع پیام Point را تعریف کنید. یک Point نشان دهنده یک جفت مختصات طول و عرض جغرافیایی روی نقشه است. برای این codelab، از اعداد صحیح برای مختصات استفاده کنید:
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
اعداد 1 و 2 شمارههای شناسایی منحصر به فرد برای هر یک از فیلدهای موجود در ساختار message هستند.
سپس، نوع پیام Feature را تعریف کنید. یک Feature از یک فیلد string برای نام یا آدرس پستی چیزی در مکانی که توسط Point مشخص شده است، استفاده میکند:
message Feature {
// The name or address of the feature.
string name = 1;
// The point where the feature is located.
Point location = 2;
}
برای اینکه چندین نقطه در یک منطقه بتوانند به یک کلاینت ارسال شوند، به یک پیام Rectangle نیاز دارید که یک مستطیل طول و عرض جغرافیایی را نشان میدهد، که به صورت دو نقطه روبروی هم به lo hi نمایش داده میشود:
message Rectangle {
// One corner of the rectangle.
Point lo = 1;
// The other corner of the rectangle.
Point hi = 2;
}
همچنین، یک پیام RouteNote که نشاندهنده پیامی است که در یک نقطه معین ارسال شده است:
message RouteNote {
// The location from which the message is sent.
Point location = 1;
// The message to be sent.
string message = 2;
}
در نهایت، به یک پیام RouteSummary نیاز خواهید داشت. این پیام در پاسخ به یک RecordRoute RPC دریافت میشود که در بخش بعدی توضیح داده شده است. این پیام شامل تعداد نقاط دریافت شده، تعداد عوارض شناسایی شده و کل مسافت طی شده به عنوان مجموع تجمعی فاصله بین هر نقطه است.
message RouteSummary {
// The number of points received.
int32 point_count = 1;
// The number of known features passed while traversing the route.
int32 feature_count = 2;
// The distance covered in metres.
int32 distance = 3;
// The duration of the traversal in seconds.
int32 elapsed_time = 4;
}
تعریف روشهای سرویس
برای تعریف یک سرویس، شما یک سرویس نامگذاری شده را در فایل .proto خود مشخص میکنید. فایل route_guide.proto دارای یک ساختار service به نام RouteGuide است که یک یا چند متد ارائه شده توسط سرویس برنامه را تعریف میکند.
وقتی متدهای RPC را در تعریف سرویس خود تعریف میکنید، نوع درخواست و پاسخ آنها را مشخص میکنید. در این بخش از codelab، بیایید موارد زیر را تعریف کنیم:
ویژگیها
اشیاء Feature موجود در Rectangle داده شده را دریافت میکند. نتایج به جای اینکه به طور همزمان بازگردانده شوند، به صورت جریانی ارسال میشوند، زیرا مستطیل ممکن است ناحیه بزرگی را پوشش دهد و شامل تعداد زیادی ویژگی باشد.
برای این برنامه، شما از یک RPC استریمینگ سمت سرور استفاده خواهید کرد: کلاینت یک درخواست به سرور ارسال میکند و یک استریم برای خواندن دنباله ای از پیامها دریافت میکند. کلاینت از استریم برگشتی میخواند تا زمانی که دیگر پیامی وجود نداشته باشد. همانطور که در مثال ما میبینید، شما یک متد استریمینگ سمت سرور را با قرار دادن کلمه کلیدی stream قبل از نوع پاسخ مشخص میکنید.
rpc ListFeatures(Rectangle) returns (stream Feature) {}
رکوردروت
جریانی از نقاط روی مسیری که پیمایش میشود را میپذیرد و پس از اتمام پیمایش، یک RouteSummary برمیگرداند.
یک RPC استریمینگ سمت کلاینت در این مورد مناسب است: کلاینت دنباله ای از پیام ها را می نویسد و آنها را دوباره با استفاده از یک استریم ارائه شده به سرور ارسال می کند. پس از اینکه کلاینت نوشتن پیام ها را تمام کرد، منتظر می ماند تا سرور همه آنها را بخواند و پاسخ خود را برگرداند. شما با قرار دادن کلمه کلیدی stream قبل از نوع درخواست، یک روش استریمینگ سمت کلاینت را مشخص می کنید.
rpc RecordRoute(stream Point) returns (RouteSummary) {}
روتچت
جریانی از RouteNotes را که هنگام پیمایش یک مسیر ارسال میشوند، میپذیرد، در حالی که RouteNotes دیگری (مثلاً از سایر کاربران) را دریافت میکند.
این دقیقاً همان نوع کاربرد استریمینگ دوطرفه است. یک RPC استریمینگ دوطرفه که در آن هر دو طرف با استفاده از یک استریم خواندنی-نوشتنی، دنبالهای از پیامها را ارسال میکنند. این دو استریم بهطور مستقل عمل میکنند، بنابراین کلاینتها و سرورها میتوانند به هر ترتیبی که دوست دارند، بخوانند و بنویسند: برای مثال، سرور میتواند قبل از نوشتن پاسخهای خود، منتظر دریافت تمام پیامهای کلاینت بماند، یا میتواند بهطور متناوب یک پیام را بخواند و سپس یک پیام بنویسد، یا ترکیب دیگری از خواندن و نوشتن. ترتیب پیامها در هر استریم حفظ میشود. شما این نوع متد را با قرار دادن کلمه کلیدی stream قبل از درخواست و پاسخ مشخص میکنید.
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
۴. تولید کد کلاینت و سرور
در مرحله بعد باید رابطهای کلاینت و سرور gRPC را از تعریف سرویس .proto خود تولید کنیم. ما این کار را با استفاده از کامپایلر بافر پروتکل protoc با یک افزونه ویژه gRPC Java انجام میدهیم. برای تولید سرویسهای gRPC باید از کامپایلر proto3 (که از هر دو سینتکس proto2 و proto3 پشتیبانی میکند) استفاده کنید.
هنگام استفاده از Gradle یا Maven، افزونه protoc build میتواند کد لازم را به عنوان بخشی از ساخت تولید کند. برای نحوه تولید کد از فایلهای .proto خودتان، میتوانید به grpc-java README مراجعه کنید.
ما پیکربندی Gradle را ارائه دادهایم.
از دایرکتوری streaming-grpc-java-getting-started دستور زیر را وارد کنید:
$ chmod +x gradlew $ ./gradlew generateProto
کلاسهای زیر از تعریف سرویس ما (در مسیر build/generated/sources/proto/main/java ) تولید میشوند:
- یکی برای هر نوع پیام:
Feature.java،Rectangle.java, ...که شامل تمام کد بافر پروتکل برای پر کردن، سریالسازی و بازیابی انواع پیامهای درخواست و پاسخ ما است. -
RouteGuideGrpc.javaکه شامل (همراه با برخی کدهای مفید دیگر) یک کلاس پایه برای پیادهسازی سرورهایRouteGuide،RouteGuideGrpc.RouteGuideImplBase، به همراه تمام متدهای تعریف شده در سرویسRouteGuideو کلاسهای stub برای استفاده کلاینتها است.
۵. پیادهسازی سرویس
ابتدا بیایید نگاهی به نحوه ایجاد یک سرور RouteGuide بیندازیم. برای اینکه سرویس RouteGuide ما کارش را انجام دهد، دو بخش وجود دارد:
- پیادهسازی رابط سرویس تولید شده از تعریف سرویس ما: انجام "کار" واقعی سرویس ما.
- اجرای یک سرور gRPC برای گوش دادن به درخواستهای کلاینتها و ارسال آنها به پیادهسازی صحیح سرویس.
پیادهسازی RouteGuide
ما یک کلاس RouteGuideService پیادهسازی خواهیم کرد که کلاس تولید شدهی RouteGuideGrpc.RouteGuideImplBase را ارثبری میکند. پیادهسازی ما به این صورت خواهد بود.
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
...
}
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
...
}
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
...
}
بیایید هر پیادهسازی RPC را با جزئیات بررسی کنیم.
RPC استریمینگ سمت سرور
در ادامه به یکی از RPCهای استریمینگ خود نگاهی میاندازیم. ListFeatures یک RPC استریمینگ سمت سرور است، بنابراین باید چندین Features به کلاینت خود ارسال کنیم.
private final Collection<Feature> features;
@Override
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());
int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());
int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());
int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());
for (Feature feature : features) {
if (!RouteGuideUtil.exists(feature)) {
continue;
}
int lat = feature.getLocation().getLatitude();
int lon = feature.getLocation().getLongitude();
if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
responseObserver.onNext(feature);
}
}
responseObserver.onCompleted();
}
مانند RPC ساده، این متد یک شیء درخواست ( Rectangle که کلاینت ما میخواهد Features در آن پیدا کند) و یک ناظر پاسخ StreamObserver دریافت میکند.
این بار، ما به تعداد مورد نیاز اشیاء Feature را برای بازگرداندن به کلاینت دریافت میکنیم (در این حالت، آنها را از مجموعه ویژگیهای سرویس بر اساس اینکه آیا درون Rectangle درخواست ما هستند یا خیر، انتخاب میکنیم) و آنها را به نوبت با استفاده از متد onNext() در ناظر پاسخ مینویسیم. در نهایت، مانند RPC سادهمان، از متد onCompleted() ناظر پاسخ استفاده میکنیم تا به gRPC بگوییم که نوشتن پاسخها را تمام کردهایم.
RPC استریمینگ سمت کلاینت
حالا بیایید به چیزی کمی پیچیدهتر نگاه کنیم: متد استریمینگ سمت کلاینت RecordRoute() ، که در آن جریانی از Points را از کلاینت دریافت میکنیم و یک RouteSummary واحد را با اطلاعات مربوط به سفر آنها برمیگردانیم.
@Override
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
return new StreamObserver<Point>() {
int pointCount;
int featureCount;
int distance;
Point previous;
long startTime = System.nanoTime();
@Override
public void onNext(Point point) {
pointCount++;
if (RouteGuideUtil.exists(checkFeature(point))) {
featureCount++;
}
// For each point after the first, add the incremental distance from the previous point
// to the total distance value.
if (previous != null) {
distance += calcDistance(previous, point);
}
previous = point;
}
@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "Encountered error in recordRoute", t);
}
@Override
public void onCompleted() {
long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
.setFeatureCount(featureCount).setDistance(distance)
.setElapsedTime((int) seconds).build());
responseObserver.onCompleted();
}
};
}
همانطور که میبینید، مانند انواع متد قبلی، متد ما یک پارامتر responseObserver نوع StreamObserver دریافت میکند، اما این بار یک StreamObserver برای کلاینت برمیگرداند تا Points خود را بنویسد.
در بدنهی متد، یک StreamObserver ناشناس را برای بازگشت نمونهسازی میکنیم که در آن:
- متد
onNext()را برای دریافت ویژگیها و سایر اطلاعات، هر بار که کلاینت یکPointدر جریان پیام مینویسد، بازنویسی کنید. - متد
onCompleted()(که وقتی کلاینت نوشتن پیامها را تمام کرد فراخوانی میشود) را برای پر کردن و ساختRouteSummaryخود، بازنویسی میکنیم. سپس ناظر پاسخ متد خود، یعنیonNext()را باRouteSummaryخود فراخوانی میکنیم و سپس متدonCompleted()آن را برای پایان دادن به فراخوانی از سمت سرور فراخوانی میکنیم.
RPC استریمینگ دوطرفه
در نهایت، بیایید نگاهی به RPC استریمینگ دوطرفه RouteChat() خود بیندازیم.
@Override
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
return new StreamObserver<RouteNote>() {
@Override
public void onNext(RouteNote note) {
List<RouteNote> notes = getOrCreateNotes(note.getLocation());
// Respond with all previous notes at this location.
for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
responseObserver.onNext(prevNote);
}
// Now add the new note to the list
notes.add(note);
}
@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "Encountered error in routeChat", t);
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
همانند مثال استریمینگ سمت کلاینت، ما هم یک StreamObserver دریافت و هم برمیگردانیم، با این تفاوت که این بار مقادیر را از طریق ناظر پاسخ متد خود برمیگردانیم، در حالی که کلاینت هنوز در حال نوشتن پیامها در استریم پیام خود است. سینتکس خواندن و نوشتن در اینجا دقیقاً مشابه متدهای استریمینگ کلاینت و استریمینگ سرور است. اگرچه هر طرف همیشه پیامهای طرف دیگر را به ترتیبی که نوشته شدهاند دریافت میکند، اما هم کلاینت و هم سرور میتوانند به هر ترتیبی بخوانند و بنویسند - استریمها کاملاً مستقل عمل میکنند.
سرور را شروع کنید
وقتی همه متدهایمان را پیادهسازی کردیم، باید یک سرور gRPC راهاندازی کنیم تا کلاینتها بتوانند از سرویس ما استفاده کنند. قطعه کد زیر نحوه انجام این کار را برای سرویس RouteGuide نشان میدهد:
public RouteGuideServer(int port, URL featureFile) throws IOException {
this(ServerBuilder.forPort(port), port, RouteGuideUtil.parseFeatures(featureFile));
}
/** Create a RouteGuide server using serverBuilder as a base and features as data. */
public RouteGuideServer(ServerBuilder<?> serverBuilder, int port, Collection<Feature> features) {
this.port = port;
server = serverBuilder.addService(new RouteGuideService(features))
.build();
}
public void start() throws IOException {
server.start();
logger.info("Server started, listening on " + port);
}
همانطور که میبینید، ما سرور خود را با استفاده از ServerBuilder میسازیم و راهاندازی میکنیم.
برای انجام این کار، ما:
- آدرس و پورتی را که میخواهیم برای گوش دادن به درخواستهای کلاینت استفاده کنیم، با استفاده از متد
forPort()سازنده مشخص کنید. - یک نمونه از کلاس پیادهسازی سرویس
RouteGuideServiceایجاد کنید و آن را به متدaddService()سازنده منتقل کنید. - برای ایجاد و شروع یک سرور RPC برای سرویس ما، توابع
build()وstart()را روی سازنده فراخوانی کنید.
از آنجایی که ServerBuilder از قبل پورت را در خود جای داده است، تنها دلیل ارسال یک پورت، استفاده از آن برای ثبت وقایع (logging) است.
۶. مشتری را ایجاد کنید
در این بخش، به ایجاد یک کلاینت برای سرویس RouteGuide خود خواهیم پرداخت. میتوانید کد کامل کلاینت نمونه ما را در ../complete/src/main/java/io/grpc/complete/routeguide/ RouteGuideClient.java مشاهده کنید.
نمونهسازی یک مقاله خرد
برای فراخوانی متدهای سرویس، ابتدا باید یک stub یا بهتر بگوییم، دو stub ایجاد کنیم:
- یک stub مسدودکننده/همزمان : این بدان معناست که فراخوانی RPC منتظر پاسخ سرور میماند و یا پاسخی را برمیگرداند یا یک استثنا ایجاد میکند.
- یک stub غیر مسدودکننده/ناهمزمان که فراخوانیهای غیر مسدودکننده را به سرور انجام میدهد، و در آنجا پاسخ به صورت ناهمزمان بازگردانده میشود. شما میتوانید انواع خاصی از فراخوانیهای جریانی را فقط با استفاده از stub ناهمزمان انجام دهید.
ابتدا باید یک کانال gRPC برای stub خود ایجاد کنیم و آدرس سرور و پورتی را که میخواهیم به آن متصل شویم، مشخص کنیم:
public static void main(String[] args) throws InterruptedException {
String target = "localhost:8980";
if (args.length > 0) {
if ("--help".equals(args[0])) {
System.err.println("Usage: [target]");
System.err.println("");
System.err.println(" target The server to connect to. Defaults to " + target);
System.exit(1);
}
target = args[0];
}
List<Feature> features;
try {
features = RouteGuideUtil.parseFeatures(RouteGuideUtil.getDefaultFeaturesFile());
} catch (IOException ex) {
ex.printStackTrace();
return;
}
ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create())
.build();
try {
RouteGuideClient client = new RouteGuideClient(channel);
// Looking for features between 40, -75 and 42, -73.
client.listFeatures(400000000, -750000000, 420000000, -730000000);
// Record a few randomly selected points from the features file.
client.recordRoute(features, 10);
// Send and receive some notes.
CountDownLatch finishLatch = client.routeChat();
if (!finishLatch.await(1, TimeUnit.MINUTES)) {
client.warning("routeChat did not finish within 1 minutes");
}
} finally {
channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
}
}
ما از یک ManagedChannelBuilder برای ایجاد کانال استفاده میکنیم.
حالا میتوانیم از کانال برای ایجاد stubهایمان با استفاده از متدهای newStub و newBlockingStub ارائه شده در کلاس RouteGuideGrpc که از .proto خود تولید کردهایم، استفاده کنیم.
public RouteGuideClient(Channel channel) {
blockingStub = RouteGuideGrpc.newBlockingStub(channel);
asyncStub = RouteGuideGrpc.newStub(channel);
}
به یاد داشته باشید، اگر مسدودکننده نباشد، پس ناهمگام است.
روشهای سرویس تماس
حالا بیایید نگاهی به نحوه فراخوانی متدهای سرویس خود بیندازیم. توجه داشته باشید که هر RPC ایجاد شده از blocking stub در حالت blocking/synchronous عمل خواهد کرد، به این معنی که فراخوانی RPC منتظر پاسخ سرور میماند و یا پاسخی را برمیگرداند یا خطایی را نشان میدهد.
RPC استریمینگ سمت سرور
در مرحله بعد، بیایید به یک فراخوانی استریمینگ سمت سرور به ListFeatures نگاهی بیندازیم که استریمی از Feature جغرافیایی را برمیگرداند:
Rectangle request = Rectangle.newBuilder()
.setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
.setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();
Iterator<Feature> features;
try {
features = blockingStub.listFeatures(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return;
}
همانطور که میبینید، این بسیار شبیه به RPC سادهی unary است که در آزمایشگاه کد Getting_Started_With_gRPC_Java بررسی کردیم، با این تفاوت که به جای بازگرداندن یک Feature ، این متد یک Iterator برمیگرداند که کلاینت میتواند از آن برای خواندن تمام Features برگردانده شده استفاده کند.
RPC استریمینگ سمت کلاینت
حالا به سراغ چیزی کمی پیچیدهتر میرویم: متد استریمینگ سمت کلاینت RecordRoute ، که در آن یک استریم از Points به سرور ارسال میکنیم و یک RouteSummary واحد دریافت میکنیم. برای این متد باید از stub ناهمزمان استفاده کنیم. اگر قبلاً بخش ایجاد سرور را خوانده باشید، ممکن است برخی از این موارد بسیار آشنا به نظر برسند - RPCهای استریمینگ ناهمزمان به روشی مشابه در هر دو طرف پیادهسازی میشوند.
public void recordRoute(List<Feature> features, int numPoints) throws InterruptedException {
info("*** RecordRoute");
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {
@Override
public void onNext(RouteSummary summary) {
info("Finished trip with {0} points. Passed {1} features. "
+ "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime());
}
@Override
public void onError(Throwable t) {
Status status = Status.fromThrowable(t);
logger.log(Level.WARNING, "RecordRoute Failed: {0}", status);
finishLatch.countDown();
}
@Override
public void onCompleted() {
info("Finished RecordRoute");
finishLatch.countDown();
}
};
StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
try {
// Send numPoints points randomly selected from the features list.
Random rand = new Random();
for (int i = 0; i < numPoints; ++i) {
int index = rand.nextInt(features.size());
Point point = features.get(index).getLocation();
info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
RouteGuideUtil.getLongitude(point));
requestObserver.onNext(point);
// Sleep for a bit before sending the next one.
Thread.sleep(rand.nextInt(1000) + 500);
if (finishLatch.getCount() == 0) {
// RPC completed or errored before we finished sending.
// Sending further requests won't error, but they will just be thrown away.
return;
}
}
} catch (RuntimeException e) {
// Cancel RPC
requestObserver.onError(e);
throw e;
}
// Mark the end of requests
requestObserver.onCompleted();
// Receiving happens asynchronously
finishLatch.await(1, TimeUnit.MINUTES);
}
همانطور که میبینید، برای فراخوانی این متد باید یک StreamObserver ایجاد کنیم که یک رابط ویژه برای سرور پیادهسازی میکند تا با پاسخ RouteSummary خود آن را فراخوانی کند. در StreamObserver خود، ما:
- متد
onNext()را برای چاپ اطلاعات برگشتی، هنگامی که سرور یکRouteSummaryدر جریان پیام مینویسد، بازنویسی کنید. - متد
onCompleted()(که وقتی سرور فراخوانی را در سمت خود به پایان رساند، فراخوانی میشود) را برای کاهشCountDownLatchبازنویسی کنید تا بتوانیم بررسی کنیم که آیا نوشتن سرور به پایان رسیده است یا خیر.
سپس StreamObserver به متد recordRoute() مربوط به stub ناهمزمان ارسال میکنیم و ناظر درخواست StreamObserver خودمان را برای نوشتن Points جهت ارسال به سرور دریافت میکنیم. پس از اتمام نوشتن امتیازها، از متد onCompleted() ناظر درخواست استفاده میکنیم تا به gRPC بگوییم که نوشتن در سمت کلاینت را تمام کردهایم. پس از اتمام کار، CountDownLatch خود را بررسی میکنیم تا ببینیم آیا سرور کار را در سمت خود به پایان رسانده است یا خیر.
RPC استریمینگ دوطرفه
در نهایت، بیایید نگاهی به RPC استریمینگ دوطرفه RouteChat() خود بیندازیم.
public CountDownLatch routeChat() {
info("*** RouteChat");
final CountDownLatch finishLatch = new CountDownLatch(1);
StreamObserver<RouteNote> requestObserver =
asyncStub.routeChat(new StreamObserver<RouteNote>() {
@Override
public void onNext(RouteNote note) {
info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()
.getLatitude(), note.getLocation().getLongitude());
}
@Override
public void onError(Throwable t) {
warning("RouteChat Failed: {0}", Status.fromThrowable(t));
finishLatch.countDown();
}
@Override
public void onCompleted() {
info("Finished RouteChat");
finishLatch.countDown();
}
});
try {
RouteNote[] requests =
{newNote("First message", 0, 0), newNote("Second message", 0, 10_000_000),
newNote("Third message", 10_000_000, 0), newNote("Fourth message", 10_000_000, 10_000_000)};
for (RouteNote request : requests) {
info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()
.getLatitude(), request.getLocation().getLongitude());
requestObserver.onNext(request);
}
} catch (RuntimeException e) {
// Cancel RPC
requestObserver.onError(e);
throw e;
}
// Mark the end of requests
requestObserver.onCompleted();
// return the latch while receiving happens asynchronously
return finishLatch;
}
همانند مثال استریمینگ سمت کلاینت، ما هم یک ناظر پاسخ StreamObserver دریافت و هم برمیگردانیم، با این تفاوت که این بار مقادیر را از طریق ناظر پاسخ متد خود ارسال میکنیم در حالی که سرور هنوز در حال نوشتن پیامها به جریان پیامهای آنها است. سینتکس خواندن و نوشتن در اینجا دقیقاً مشابه متد استریمینگ کلاینت ما است. اگرچه هر طرف همیشه پیامهای طرف دیگر را به ترتیبی که نوشته شدهاند دریافت میکند، اما هم کلاینت و هم سرور میتوانند به هر ترتیبی بخوانند و بنویسند - استریمها کاملاً مستقل عمل میکنند.
۷. امتحانش کن!
- از دایرکتوری
start_here:
$ ./gradlew installDist
این دستور کد شما را کامپایل میکند، آن را در یک jar بستهبندی میکند و اسکریپتهایی را ایجاد میکند که مثال را اجرا میکنند. این اسکریپتها در دایرکتوری build/install/start_here/bin/ ایجاد میشوند. این اسکریپتها عبارتند از: route-guide-server و route-guide-client .
قبل از شروع کلاینت، سرور باید در حال اجرا باشد.
- سرور را اجرا کنید:
$ ./build/install/start_here/bin/route-guide-server
- کلاینت را اجرا کنید:
$ ./build/install/start_here/bin/route-guide-client
۸. قدم بعدی چیست؟
- بیاموزید که gRPC چگونه کار میکند در مقدمهای بر gRPC و مفاهیم اصلی
- آموزش مبانی را دنبال کنید
- مرجع API را بررسی کنید.