شروع به کار با gRPC-Java - Streaming

۱. مقدمه

در این آزمایشگاه کد، شما از gRPC-Java برای ایجاد یک کلاینت و سرور استفاده خواهید کرد که پایه و اساس یک برنامه مسیریابی نوشته شده در جاوا را تشکیل می‌دهند.

در پایان آموزش، شما یک کلاینت خواهید داشت که با استفاده از gRPC به یک سرور راه دور متصل می‌شود تا اطلاعاتی در مورد ویژگی‌های مسیر کلاینت دریافت کند، خلاصه‌ای از مسیر کلاینت ایجاد کند و اطلاعات مسیر مانند به‌روزرسانی‌های ترافیک را با سرور و سایر کلاینت‌ها تبادل کند.

این سرویس در یک فایل Protocol Buffers تعریف شده است که برای تولید کد تکراری برای کلاینت و سرور استفاده می‌شود تا بتوانند با یکدیگر ارتباط برقرار کنند و در زمان و تلاش شما برای پیاده‌سازی آن قابلیت صرفه‌جویی شود.

این کد تولید شده نه تنها پیچیدگی‌های ارتباط بین سرور و کلاینت، بلکه سریال‌سازی و از سریال‌زدایی داده‌ها را نیز برطرف می‌کند.

آنچه یاد خواهید گرفت

  • نحوه استفاده از بافرهای پروتکل برای تعریف یک API سرویس.
  • نحوه ساخت یک کلاینت و سرور مبتنی بر gRPC از تعریف Protocol Buffers با استفاده از تولید خودکار کد.
  • آشنایی با ارتباطات استریمینگ کلاینت-سرور با gRPC

این آزمایشگاه کد برای توسعه‌دهندگان جاوا که تازه با gRPC آشنا شده‌اند یا به دنبال مرور gRPC هستند، یا هر کسی که به ساخت سیستم‌های توزیع‌شده علاقه‌مند است، مناسب است. هیچ تجربه قبلی gRPC لازم نیست.

۲. قبل از شروع

پیش‌نیازها

  • JDK نسخه ۲۴

کد را دریافت کنید

برای اینکه مجبور نباشید کاملاً از ابتدا شروع کنید، این codelab چارچوبی از کد منبع برنامه را برای تکمیل شما فراهم می‌کند. مراحل زیر نحوه تکمیل برنامه، از جمله استفاده از افزونه‌های کامپایلر بافر پروتکل برای تولید کد gRPC قالب‌بندی شده را به شما نشان می‌دهد.

ابتدا، دایرکتوری کاری codelab را ایجاد کنید و با دستور cd به آن وارد شوید:

mkdir streaming-grpc-java-getting-started && cd streaming-grpc-java-getting-started

کدلب را دانلود و استخراج کنید:

curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
  | tar xvz --strip-components=4 \
  grpc-codelabs-1/codelabs/grpc-java-streaming/start_here

روش دیگر این است که فایل .zip که فقط شامل دایرکتوری codelab است را دانلود کرده و به صورت دستی آن را از حالت فشرده خارج کنید.

اگر می‌خواهید از تایپ کردن پیاده‌سازی صرف‌نظر کنید، کد منبع تکمیل‌شده در گیت‌هاب موجود است.

۳. تعریف پیام‌ها و خدمات

اولین قدم شما تعریف سرویس gRPC برنامه، متد RPC آن و انواع پیام‌های درخواست و پاسخ آن با استفاده از Protocol Buffers است. سرویس شما موارد زیر را ارائه خواهد داد:

  • متدهای RPC به نام‌های ListFeatures ، RecordRoute و RouteChat که سرور پیاده‌سازی می‌کند و کلاینت آنها را فراخوانی می‌کند.
  • انواع پیام Point ، Feature ، Rectangle ، RouteNote و RouteSummary هستند که ساختارهای داده‌ای هستند که هنگام فراخوانی متدهای بالا بین کلاینت و سرور رد و بدل می‌شوند.

بافرهای پروتکل معمولاً به عنوان protobufs شناخته می‌شوند. برای اطلاعات بیشتر در مورد اصطلاحات gRPC، به مفاهیم اصلی، معماری و چرخه حیات gRPC مراجعه کنید.

این متد RPC و انواع پیام‌های آن، همگی در فایل proto/routeguide/route_guide.proto از کد منبع ارائه شده تعریف خواهند شد.

بیایید یک فایل route_guide.proto ایجاد کنیم.

از آنجایی که در این مثال کد جاوا تولید می‌کنیم، گزینه فایل java_package را در .proto خود مشخص کرده‌ایم:

option java_package = "io.grpc.examples.routeguide";
option java_outer_classname = "RouteGuideProto";

تعریف انواع پیام

در فایل proto/routeguide/route_guide.proto از کد منبع، ابتدا نوع پیام Point را تعریف کنید. یک Point نشان دهنده یک جفت مختصات طول و عرض جغرافیایی روی نقشه است. برای این codelab، از اعداد صحیح برای مختصات استفاده کنید:

message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

اعداد 1 و 2 شماره‌های شناسایی منحصر به فرد برای هر یک از فیلدهای موجود در ساختار message هستند.

سپس، نوع پیام Feature را تعریف کنید. یک Feature از یک فیلد string برای نام یا آدرس پستی چیزی در مکانی که توسط Point مشخص شده است، استفاده می‌کند:

message Feature {
  // The name or address of the feature.
  string name = 1;

  // The point where the feature is located.
  Point location = 2;
}

برای اینکه چندین نقطه در یک منطقه بتوانند به یک کلاینت ارسال شوند، به یک پیام Rectangle نیاز دارید که یک مستطیل طول و عرض جغرافیایی را نشان می‌دهد، که به صورت دو نقطه روبروی هم به lo hi نمایش داده می‌شود:

message Rectangle {
  // One corner of the rectangle.
  Point lo = 1;

  // The other corner of the rectangle.
  Point hi = 2;
}

همچنین، یک پیام RouteNote که نشان‌دهنده پیامی است که در یک نقطه معین ارسال شده است:

message RouteNote {
  // The location from which the message is sent.
  Point location = 1;

  // The message to be sent.
  string message = 2;
}

در نهایت، به یک پیام RouteSummary نیاز خواهید داشت. این پیام در پاسخ به یک RecordRoute RPC دریافت می‌شود که در بخش بعدی توضیح داده شده است. این پیام شامل تعداد نقاط دریافت شده، تعداد عوارض شناسایی شده و کل مسافت طی شده به عنوان مجموع تجمعی فاصله بین هر نقطه است.

message RouteSummary {
  // The number of points received.
  int32 point_count = 1;

  // The number of known features passed while traversing the route.
  int32 feature_count = 2;

  // The distance covered in metres.
  int32 distance = 3;

  // The duration of the traversal in seconds.
  int32 elapsed_time = 4;
}

تعریف روش‌های سرویس

برای تعریف یک سرویس، شما یک سرویس نامگذاری شده را در فایل .proto خود مشخص می‌کنید. فایل route_guide.proto دارای یک ساختار service به نام RouteGuide است که یک یا چند متد ارائه شده توسط سرویس برنامه را تعریف می‌کند.

وقتی متدهای RPC را در تعریف سرویس خود تعریف می‌کنید، نوع درخواست و پاسخ آنها را مشخص می‌کنید. در این بخش از codelab، بیایید موارد زیر را تعریف کنیم:

ویژگی‌ها

اشیاء Feature موجود در Rectangle داده شده را دریافت می‌کند. نتایج به جای اینکه به طور همزمان بازگردانده شوند، به صورت جریانی ارسال می‌شوند، زیرا مستطیل ممکن است ناحیه بزرگی را پوشش دهد و شامل تعداد زیادی ویژگی باشد.

برای این برنامه، شما از یک RPC استریمینگ سمت سرور استفاده خواهید کرد: کلاینت یک درخواست به سرور ارسال می‌کند و یک استریم برای خواندن دنباله ای از پیام‌ها دریافت می‌کند. کلاینت از استریم برگشتی می‌خواند تا زمانی که دیگر پیامی وجود نداشته باشد. همانطور که در مثال ما می‌بینید، شما یک متد استریمینگ سمت سرور را با قرار دادن کلمه کلیدی stream قبل از نوع پاسخ مشخص می‌کنید.

rpc ListFeatures(Rectangle) returns (stream Feature) {}

رکوردروت

جریانی از نقاط روی مسیری که پیمایش می‌شود را می‌پذیرد و پس از اتمام پیمایش، یک RouteSummary برمی‌گرداند.

یک RPC استریمینگ سمت کلاینت در این مورد مناسب است: کلاینت دنباله ای از پیام ها را می نویسد و آنها را دوباره با استفاده از یک استریم ارائه شده به سرور ارسال می کند. پس از اینکه کلاینت نوشتن پیام ها را تمام کرد، منتظر می ماند تا سرور همه آنها را بخواند و پاسخ خود را برگرداند. شما با قرار دادن کلمه کلیدی stream قبل از نوع درخواست، یک روش استریمینگ سمت کلاینت را مشخص می کنید.

rpc RecordRoute(stream Point) returns (RouteSummary) {}

روت‌چت

جریانی از RouteNotes را که هنگام پیمایش یک مسیر ارسال می‌شوند، می‌پذیرد، در حالی که RouteNotes دیگری (مثلاً از سایر کاربران) را دریافت می‌کند.

این دقیقاً همان نوع کاربرد استریمینگ دوطرفه است. یک RPC استریمینگ دوطرفه که در آن هر دو طرف با استفاده از یک استریم خواندنی-نوشتنی، دنباله‌ای از پیام‌ها را ارسال می‌کنند. این دو استریم به‌طور مستقل عمل می‌کنند، بنابراین کلاینت‌ها و سرورها می‌توانند به هر ترتیبی که دوست دارند، بخوانند و بنویسند: برای مثال، سرور می‌تواند قبل از نوشتن پاسخ‌های خود، منتظر دریافت تمام پیام‌های کلاینت بماند، یا می‌تواند به‌طور متناوب یک پیام را بخواند و سپس یک پیام بنویسد، یا ترکیب دیگری از خواندن و نوشتن. ترتیب پیام‌ها در هر استریم حفظ می‌شود. شما این نوع متد را با قرار دادن کلمه کلیدی stream قبل از درخواست و پاسخ مشخص می‌کنید.

rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

۴. تولید کد کلاینت و سرور

در مرحله بعد باید رابط‌های کلاینت و سرور gRPC را از تعریف سرویس .proto خود تولید کنیم. ما این کار را با استفاده از کامپایلر بافر پروتکل protoc با یک افزونه ویژه gRPC Java انجام می‌دهیم. برای تولید سرویس‌های gRPC باید از کامپایلر proto3 (که از هر دو سینتکس proto2 و proto3 پشتیبانی می‌کند) استفاده کنید.

هنگام استفاده از Gradle یا Maven، افزونه protoc build می‌تواند کد لازم را به عنوان بخشی از ساخت تولید کند. برای نحوه تولید کد از فایل‌های .proto خودتان، می‌توانید به grpc-java README مراجعه کنید.

ما پیکربندی Gradle را ارائه داده‌ایم.

از دایرکتوری streaming-grpc-java-getting-started دستور زیر را وارد کنید:

$ chmod +x gradlew
$ ./gradlew generateProto

کلاس‌های زیر از تعریف سرویس ما (در مسیر build/generated/sources/proto/main/java ) تولید می‌شوند:

  • یکی برای هر نوع پیام: Feature.java ، Rectangle.java, ... که شامل تمام کد بافر پروتکل برای پر کردن، سریال‌سازی و بازیابی انواع پیام‌های درخواست و پاسخ ما است.
  • RouteGuideGrpc.java که شامل (همراه با برخی کدهای مفید دیگر) یک کلاس پایه برای پیاده‌سازی سرورهای RouteGuide ، RouteGuideGrpc.RouteGuideImplBase ، به همراه تمام متدهای تعریف شده در سرویس RouteGuide و کلاس‌های stub برای استفاده کلاینت‌ها است.

۵. پیاده‌سازی سرویس

ابتدا بیایید نگاهی به نحوه ایجاد یک سرور RouteGuide بیندازیم. برای اینکه سرویس RouteGuide ما کارش را انجام دهد، دو بخش وجود دارد:

  • پیاده‌سازی رابط سرویس تولید شده از تعریف سرویس ما: انجام "کار" واقعی سرویس ما.
  • اجرای یک سرور gRPC برای گوش دادن به درخواست‌های کلاینت‌ها و ارسال آنها به پیاده‌سازی صحیح سرویس.

پیاده‌سازی RouteGuide

ما یک کلاس RouteGuideService پیاده‌سازی خواهیم کرد که کلاس تولید شده‌ی RouteGuideGrpc.RouteGuideImplBase را ارث‌بری می‌کند. پیاده‌سازی ما به این صورت خواهد بود.

public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
        ...
}

public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {

        ...
}

public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {

        ...
}

بیایید هر پیاده‌سازی RPC را با جزئیات بررسی کنیم.

RPC استریمینگ سمت سرور

در ادامه به یکی از RPCهای استریمینگ خود نگاهی می‌اندازیم. ListFeatures یک RPC استریمینگ سمت سرور است، بنابراین باید چندین Features به کلاینت خود ارسال کنیم.

private final Collection<Feature> features;

@Override
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
  int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());
  int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());
  int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());
  int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());

  for (Feature feature : features) {
    if (!RouteGuideUtil.exists(feature)) {
      continue;
    }

    int lat = feature.getLocation().getLatitude();
    int lon = feature.getLocation().getLongitude();
    if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
      responseObserver.onNext(feature);
    }
  }
  responseObserver.onCompleted();
}

مانند RPC ساده، این متد یک شیء درخواست ( Rectangle که کلاینت ما می‌خواهد Features در آن پیدا کند) و یک ناظر پاسخ StreamObserver دریافت می‌کند.

این بار، ما به تعداد مورد نیاز اشیاء Feature را برای بازگرداندن به کلاینت دریافت می‌کنیم (در این حالت، آنها را از مجموعه ویژگی‌های سرویس بر اساس اینکه آیا درون Rectangle درخواست ما هستند یا خیر، انتخاب می‌کنیم) و آنها را به نوبت با استفاده از متد onNext() در ناظر پاسخ می‌نویسیم. در نهایت، مانند RPC ساده‌مان، از متد onCompleted() ناظر پاسخ استفاده می‌کنیم تا به gRPC بگوییم که نوشتن پاسخ‌ها را تمام کرده‌ایم.

RPC استریمینگ سمت کلاینت

حالا بیایید به چیزی کمی پیچیده‌تر نگاه کنیم: متد استریمینگ سمت کلاینت RecordRoute() ، که در آن جریانی از Points را از کلاینت دریافت می‌کنیم و یک RouteSummary واحد را با اطلاعات مربوط به سفر آنها برمی‌گردانیم.

@Override
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
  return new StreamObserver<Point>() {
    int pointCount;
    int featureCount;
    int distance;
    Point previous;
    long startTime = System.nanoTime();

    @Override
    public void onNext(Point point) {
      pointCount++;
      if (RouteGuideUtil.exists(checkFeature(point))) {
        featureCount++;
      }
      // For each point after the first, add the incremental distance from the previous point
      // to the total distance value.
      if (previous != null) {
        distance += calcDistance(previous, point);
      }
      previous = point;
    }

    @Override
    public void onError(Throwable t) {
      logger.log(Level.WARNING, "Encountered error in recordRoute", t);
    }

    @Override
    public void onCompleted() {
      long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
      responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
          .setFeatureCount(featureCount).setDistance(distance)
          .setElapsedTime((int) seconds).build());
      responseObserver.onCompleted();
    }
  };
}

همانطور که می‌بینید، مانند انواع متد قبلی، متد ما یک پارامتر responseObserver نوع StreamObserver دریافت می‌کند، اما این بار یک StreamObserver برای کلاینت برمی‌گرداند تا Points خود را بنویسد.

در بدنه‌ی متد، یک StreamObserver ناشناس را برای بازگشت نمونه‌سازی می‌کنیم که در آن:

  • متد onNext() را برای دریافت ویژگی‌ها و سایر اطلاعات، هر بار که کلاینت یک Point در جریان پیام می‌نویسد، بازنویسی کنید.
  • متد onCompleted() (که وقتی کلاینت نوشتن پیام‌ها را تمام کرد فراخوانی می‌شود) را برای پر کردن و ساخت RouteSummary خود، بازنویسی می‌کنیم. سپس ناظر پاسخ متد خود، یعنی onNext() را با RouteSummary خود فراخوانی می‌کنیم و سپس متد onCompleted() آن را برای پایان دادن به فراخوانی از سمت سرور فراخوانی می‌کنیم.

RPC استریمینگ دوطرفه

در نهایت، بیایید نگاهی به RPC استریمینگ دوطرفه RouteChat() خود بیندازیم.

@Override
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
  return new StreamObserver<RouteNote>() {
    @Override
    public void onNext(RouteNote note) {
      List<RouteNote> notes = getOrCreateNotes(note.getLocation());

      // Respond with all previous notes at this location.
      for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
        responseObserver.onNext(prevNote);
      }

      // Now add the new note to the list
      notes.add(note);
    }

    @Override
    public void onError(Throwable t) {
      logger.log(Level.WARNING, "Encountered error in routeChat", t);
    }

    @Override
    public void onCompleted() {
      responseObserver.onCompleted();
    }
  };
}

همانند مثال استریمینگ سمت کلاینت، ما هم یک StreamObserver دریافت و هم برمی‌گردانیم، با این تفاوت که این بار مقادیر را از طریق ناظر پاسخ متد خود برمی‌گردانیم، در حالی که کلاینت هنوز در حال نوشتن پیام‌ها در استریم پیام خود است. سینتکس خواندن و نوشتن در اینجا دقیقاً مشابه متدهای استریمینگ کلاینت و استریمینگ سرور است. اگرچه هر طرف همیشه پیام‌های طرف دیگر را به ترتیبی که نوشته شده‌اند دریافت می‌کند، اما هم کلاینت و هم سرور می‌توانند به هر ترتیبی بخوانند و بنویسند - استریم‌ها کاملاً مستقل عمل می‌کنند.

سرور را شروع کنید

وقتی همه متدهایمان را پیاده‌سازی کردیم، باید یک سرور gRPC راه‌اندازی کنیم تا کلاینت‌ها بتوانند از سرویس ما استفاده کنند. قطعه کد زیر نحوه انجام این کار را برای سرویس RouteGuide نشان می‌دهد:

public RouteGuideServer(int port, URL featureFile) throws IOException {
  this(ServerBuilder.forPort(port), port, RouteGuideUtil.parseFeatures(featureFile));
}

/** Create a RouteGuide server using serverBuilder as a base and features as data. */
public RouteGuideServer(ServerBuilder<?> serverBuilder, int port, Collection<Feature> features) {
  this.port = port;
  server = serverBuilder.addService(new RouteGuideService(features))
      .build();
}
public void start() throws IOException {
  server.start();
  logger.info("Server started, listening on " + port);
}

همانطور که می‌بینید، ما سرور خود را با استفاده از ServerBuilder می‌سازیم و راه‌اندازی می‌کنیم.

برای انجام این کار، ما:

  1. آدرس و پورتی را که می‌خواهیم برای گوش دادن به درخواست‌های کلاینت استفاده کنیم، با استفاده از متد forPort() سازنده مشخص کنید.
  2. یک نمونه از کلاس پیاده‌سازی سرویس RouteGuideService ایجاد کنید و آن را به متد addService() سازنده منتقل کنید.
  3. برای ایجاد و شروع یک سرور RPC برای سرویس ما، توابع build() و start() را روی سازنده فراخوانی کنید.

از آنجایی که ServerBuilder از قبل پورت را در خود جای داده است، تنها دلیل ارسال یک پورت، استفاده از آن برای ثبت وقایع (logging) است.

۶. مشتری را ایجاد کنید

در این بخش، به ایجاد یک کلاینت برای سرویس RouteGuide خود خواهیم پرداخت. می‌توانید کد کامل کلاینت نمونه ما را در ../complete/src/main/java/io/grpc/complete/routeguide/ RouteGuideClient.java مشاهده کنید.

نمونه‌سازی یک مقاله خرد

برای فراخوانی متدهای سرویس، ابتدا باید یک stub یا بهتر بگوییم، دو stub ایجاد کنیم:

  • یک stub مسدودکننده/همزمان : این بدان معناست که فراخوانی RPC منتظر پاسخ سرور می‌ماند و یا پاسخی را برمی‌گرداند یا یک استثنا ایجاد می‌کند.
  • یک stub غیر مسدودکننده/ناهمزمان که فراخوانی‌های غیر مسدودکننده را به سرور انجام می‌دهد، و در آنجا پاسخ به صورت ناهمزمان بازگردانده می‌شود. شما می‌توانید انواع خاصی از فراخوانی‌های جریانی را فقط با استفاده از stub ناهمزمان انجام دهید.

ابتدا باید یک کانال gRPC برای stub خود ایجاد کنیم و آدرس سرور و پورتی را که می‌خواهیم به آن متصل شویم، مشخص کنیم:

  public static void main(String[] args) throws InterruptedException {
    String target = "localhost:8980";
    if (args.length > 0) {
      if ("--help".equals(args[0])) {
        System.err.println("Usage: [target]");
        System.err.println("");
        System.err.println("  target  The server to connect to. Defaults to " + target);
        System.exit(1);
      }
      target = args[0];
    }

    List<Feature> features;
    try {
      features = RouteGuideUtil.parseFeatures(RouteGuideUtil.getDefaultFeaturesFile());
    } catch (IOException ex) {
      ex.printStackTrace();
      return;
    }

    ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create())
        .build();
    try {
      RouteGuideClient client = new RouteGuideClient(channel);

      // Looking for features between 40, -75 and 42, -73.
      client.listFeatures(400000000, -750000000, 420000000, -730000000);

      // Record a few randomly selected points from the features file.
      client.recordRoute(features, 10);

      // Send and receive some notes.
      CountDownLatch finishLatch = client.routeChat();

      if (!finishLatch.await(1, TimeUnit.MINUTES)) {
        client.warning("routeChat did not finish within 1 minutes");
      }
    } finally {
      channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
    }
  }

ما از یک ManagedChannelBuilder برای ایجاد کانال استفاده می‌کنیم.

حالا می‌توانیم از کانال برای ایجاد stubهایمان با استفاده از متدهای newStub و newBlockingStub ارائه شده در کلاس RouteGuideGrpc که از .proto خود تولید کرده‌ایم، استفاده کنیم.

public RouteGuideClient(Channel channel) {
    blockingStub = RouteGuideGrpc.newBlockingStub(channel);
    asyncStub = RouteGuideGrpc.newStub(channel);
  }

به یاد داشته باشید، اگر مسدودکننده نباشد، پس ناهمگام است.

روش‌های سرویس تماس

حالا بیایید نگاهی به نحوه فراخوانی متدهای سرویس خود بیندازیم. توجه داشته باشید که هر RPC ایجاد شده از blocking stub در حالت blocking/synchronous عمل خواهد کرد، به این معنی که فراخوانی RPC منتظر پاسخ سرور می‌ماند و یا پاسخی را برمی‌گرداند یا خطایی را نشان می‌دهد.

RPC استریمینگ سمت سرور

در مرحله بعد، بیایید به یک فراخوانی استریمینگ سمت سرور به ListFeatures نگاهی بیندازیم که استریمی از Feature جغرافیایی را برمی‌گرداند:

Rectangle request = Rectangle.newBuilder()
             .setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
        .setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();

Iterator<Feature> features;
try {
  features = blockingStub.listFeatures(request);
} catch (StatusRuntimeException e) {
  logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
  return;
}

همانطور که می‌بینید، این بسیار شبیه به RPC ساده‌ی unary است که در آزمایشگاه کد Getting_Started_With_gRPC_Java بررسی کردیم، با این تفاوت که به جای بازگرداندن یک Feature ، این متد یک Iterator برمی‌گرداند که کلاینت می‌تواند از آن برای خواندن تمام Features برگردانده شده استفاده کند.

RPC استریمینگ سمت کلاینت

حالا به سراغ چیزی کمی پیچیده‌تر می‌رویم: متد استریمینگ سمت کلاینت RecordRoute ، که در آن یک استریم از Points به سرور ارسال می‌کنیم و یک RouteSummary واحد دریافت می‌کنیم. برای این متد باید از stub ناهمزمان استفاده کنیم. اگر قبلاً بخش ایجاد سرور را خوانده باشید، ممکن است برخی از این موارد بسیار آشنا به نظر برسند - RPCهای استریمینگ ناهمزمان به روشی مشابه در هر دو طرف پیاده‌سازی می‌شوند.

public void recordRoute(List<Feature> features, int numPoints) throws InterruptedException {
  info("*** RecordRoute");
  final CountDownLatch finishLatch = new CountDownLatch(1);
  StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {

    @Override
    public void onNext(RouteSummary summary) {
      info("Finished trip with {0} points. Passed {1} features. "
          + "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
          summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime());
    }

    @Override
    public void onError(Throwable t) {
      Status status = Status.fromThrowable(t);
      logger.log(Level.WARNING, "RecordRoute Failed: {0}", status);
      finishLatch.countDown();
    }

    @Override
    public void onCompleted() {
      info("Finished RecordRoute");
      finishLatch.countDown();
    }
  };

  StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
  try {
    // Send numPoints points randomly selected from the features list.
    Random rand = new Random();
    for (int i = 0; i < numPoints; ++i) {
      int index = rand.nextInt(features.size());
      Point point = features.get(index).getLocation();
      info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
          RouteGuideUtil.getLongitude(point));
      requestObserver.onNext(point);
      // Sleep for a bit before sending the next one.
      Thread.sleep(rand.nextInt(1000) + 500);
      if (finishLatch.getCount() == 0) {
        // RPC completed or errored before we finished sending.
        // Sending further requests won't error, but they will just be thrown away.
        return;
      }
    }
  } catch (RuntimeException e) {
    // Cancel RPC
    requestObserver.onError(e);
    throw e;
  }
  // Mark the end of requests
  requestObserver.onCompleted();

  // Receiving happens asynchronously
  finishLatch.await(1, TimeUnit.MINUTES);
}

همانطور که می‌بینید، برای فراخوانی این متد باید یک StreamObserver ایجاد کنیم که یک رابط ویژه برای سرور پیاده‌سازی می‌کند تا با پاسخ RouteSummary خود آن را فراخوانی کند. در StreamObserver خود، ما:

  • متد onNext() را برای چاپ اطلاعات برگشتی، هنگامی که سرور یک RouteSummary در جریان پیام می‌نویسد، بازنویسی کنید.
  • متد onCompleted() (که وقتی سرور فراخوانی را در سمت خود به پایان رساند، فراخوانی می‌شود) را برای کاهش CountDownLatch بازنویسی کنید تا بتوانیم بررسی کنیم که آیا نوشتن سرور به پایان رسیده است یا خیر.

سپس StreamObserver به متد recordRoute() مربوط به stub ناهمزمان ارسال می‌کنیم و ناظر درخواست StreamObserver خودمان را برای نوشتن Points جهت ارسال به سرور دریافت می‌کنیم. پس از اتمام نوشتن امتیازها، از متد onCompleted() ناظر درخواست استفاده می‌کنیم تا به gRPC بگوییم که نوشتن در سمت کلاینت را تمام کرده‌ایم. پس از اتمام کار، CountDownLatch خود را بررسی می‌کنیم تا ببینیم آیا سرور کار را در سمت خود به پایان رسانده است یا خیر.

RPC استریمینگ دوطرفه

در نهایت، بیایید نگاهی به RPC استریمینگ دوطرفه RouteChat() خود بیندازیم.

public CountDownLatch routeChat() {
    info("*** RouteChat");
    final CountDownLatch finishLatch = new CountDownLatch(1);
    StreamObserver<RouteNote> requestObserver =
        asyncStub.routeChat(new StreamObserver<RouteNote>() {
          @Override
          public void onNext(RouteNote note) {
            info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()
                .getLatitude(), note.getLocation().getLongitude());
          }

          @Override
          public void onError(Throwable t) {
            warning("RouteChat Failed: {0}", Status.fromThrowable(t));
            finishLatch.countDown();
          }

          @Override
          public void onCompleted() {
            info("Finished RouteChat");
            finishLatch.countDown();
          }
        });

    try {
      RouteNote[] requests =
          {newNote("First message", 0, 0), newNote("Second message", 0, 10_000_000),
              newNote("Third message", 10_000_000, 0), newNote("Fourth message", 10_000_000, 10_000_000)};

      for (RouteNote request : requests) {
        info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()
            .getLatitude(), request.getLocation().getLongitude());
        requestObserver.onNext(request);
      }
    } catch (RuntimeException e) {
      // Cancel RPC
      requestObserver.onError(e);
      throw e;
    }
    // Mark the end of requests
    requestObserver.onCompleted();

    // return the latch while receiving happens asynchronously
    return finishLatch;
  }

همانند مثال استریمینگ سمت کلاینت، ما هم یک ناظر پاسخ StreamObserver دریافت و هم برمی‌گردانیم، با این تفاوت که این بار مقادیر را از طریق ناظر پاسخ متد خود ارسال می‌کنیم در حالی که سرور هنوز در حال نوشتن پیام‌ها به جریان پیام‌های آنها است. سینتکس خواندن و نوشتن در اینجا دقیقاً مشابه متد استریمینگ کلاینت ما است. اگرچه هر طرف همیشه پیام‌های طرف دیگر را به ترتیبی که نوشته شده‌اند دریافت می‌کند، اما هم کلاینت و هم سرور می‌توانند به هر ترتیبی بخوانند و بنویسند - استریم‌ها کاملاً مستقل عمل می‌کنند.

۷. امتحانش کن!

  1. از دایرکتوری start_here :
$ ./gradlew installDist

این دستور کد شما را کامپایل می‌کند، آن را در یک jar بسته‌بندی می‌کند و اسکریپت‌هایی را ایجاد می‌کند که مثال را اجرا می‌کنند. این اسکریپت‌ها در دایرکتوری build/install/start_here/bin/ ایجاد می‌شوند. این اسکریپت‌ها عبارتند از: route-guide-server و route-guide-client .

قبل از شروع کلاینت، سرور باید در حال اجرا باشد.

  1. سرور را اجرا کنید:
$ ./build/install/start_here/bin/route-guide-server
  1. کلاینت را اجرا کنید:
$ ./build/install/start_here/bin/route-guide-client

۸. قدم بعدی چیست؟