תחילת העבודה עם gRPC-Java – סטרימינג

1. מבוא

ב-codelab הזה תשתמשו ב-gRPC-Java כדי ליצור לקוח ושרת שיהוו את הבסיס לאפליקציה למיפוי מסלולים שנכתבה ב-Java.

בסוף המדריך יהיה לכם לקוח שמתחבר לשרת מרוחק באמצעות gRPC כדי לקבל מידע על תכונות במסלול של לקוח, ליצור סיכום של המסלול של הלקוח ולהחליף מידע על המסלול, כמו עדכוני תנועה, עם השרת ועם לקוחות אחרים.

השירות מוגדר בקובץ Protocol Buffers, שישמש ליצירת קוד boilerplate ללקוח ולשרת, כדי שהם יוכלו לתקשר זה עם זה. כך תוכלו לחסוך זמן ומאמץ בהטמעת הפונקציונליות הזו.

הקוד שנוצר מטפל לא רק במורכבויות של התקשורת בין השרת ללקוח, אלא גם בסריאליזציה ובדה-סריאליזציה של הנתונים.

מה תלמדו

  • איך משתמשים ב-Protocol Buffers כדי להגדיר API של שירות.
  • איך ליצור לקוח ושרת מבוססי gRPC מהגדרה של Protocol Buffers באמצעות יצירת קוד אוטומטית.
  • הבנה של תקשורת סטרימינג בין לקוח לשרת באמצעות gRPC.

ה-codelab הזה מיועד למפתחי Java שחדשים ב-gRPC או שרוצים לרענן את הידע שלהם ב-gRPC, או לכל מי שמתעניין ביצירת מערכות מבוזרות. לא נדרש ניסיון קודם ב-gRPC.

‫2. לפני שמתחילים

דרישות מוקדמות

  • גרסה 24 של JDK.

קבל את הקוד

כדי שלא תצטרכו להתחיל מאפס, ב-codelab הזה מופיע סקאפולד של קוד המקור של האפליקציה שתוכלו להשלים. בשלבים הבאים נסביר איך לסיים את האפליקציה, כולל שימוש בתוספים של קומפיילר מאגר אחסון לפרוטוקולים כדי ליצור את קוד ה-boilerplate של gRPC.

קודם יוצרים את ספריית העבודה של ה-codelab ועוברים אליה:

mkdir streaming-grpc-java-getting-started && cd streaming-grpc-java-getting-started

מורידים ומחלצים את ה-codelab:

curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
  | tar xvz --strip-components=4 \
  grpc-codelabs-1/codelabs/grpc-java-streaming/start_here

אפשרות אחרת היא להוריד את קובץ ה-‎ .zip שמכיל רק את ספריית ה-codelab ולבטל את הדחיסה שלו באופן ידני.

קוד המקור המלא זמין ב-GitHub אם אתם רוצים לדלג על הקלדת ההטמעה.

3. הגדרת הודעות ושירותים

השלב הראשון הוא להגדיר את שירות ה-gRPC של האפליקציה, את שיטת ה-RPC ואת סוגי ההודעות של הבקשה והתגובה באמצעות Protocol Buffers. השירות שלכם יספק:

  • שיטות RPC שנקראות ListFeatures, ‏ RecordRoute ו-RouteChat, שהשרת מטמיע והלקוח קורא להן.
  • סוגי ההודעות Point,‏ Feature,‏ Rectangle,‏ RouteNote ו-RouteSummary, שהן מבני נתונים שמועברים בין הלקוח לשרת כשמפעילים את השיטות שלמעלה.

פרוטוקול Buffers ידוע בדרך כלל בשם protobufs. מידע נוסף על המינוח של gRPC זמין במאמר מושגים מרכזיים, ארכיטקטורה ומחזור חיים של gRPC.

שיטת ה-RPC הזו וסוגי ההודעות שלה יוגדרו בקובץ proto/routeguide/route_guide.proto של קוד המקור שסופק.

ניצור קובץ route_guide.proto.

בדוגמה הזו אנחנו יוצרים קוד Java, ולכן ציינו אפשרות של קובץ java_package ב-.proto:

option java_package = "io.grpc.examples.routeguide";
option java_outer_classname = "RouteGuideProto";

הגדרת סוגי הודעות

בקובץ proto/routeguide/route_guide.proto של קוד המקור, מגדירים קודם את סוג ההודעה Point. הסמל Point מייצג זוג קואורדינטות של קו רוחב וקו אורך במפה. ב-codelab הזה, משתמשים במספרים שלמים לקואורדינטות:

message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

המספרים 1 ו-2 הם מספרי מזהים ייחודיים לכל אחד מהשדות במבנה message.

לאחר מכן, מגדירים את Feature סוג ההודעה. ‫Feature משתמש בשדה string כדי לציין את השם או הכתובת למשלוח דואר של משהו במיקום שצוין על ידי Point:

message Feature {
  // The name or address of the feature.
  string name = 1;

  // The point where the feature is located.
  Point location = 2;
}

כדי להזרים ללקוח כמה נקודות באזור מסוים, צריך להשתמש בהודעה Rectangle שמייצגת מלבן של קווי רוחב ואורך, שמיוצג על ידי שתי נקודות מנוגדות באלכסון lo ו-hi:

message Rectangle {
  // One corner of the rectangle.
  Point lo = 1;

  // The other corner of the rectangle.
  Point hi = 2;
}

בנוסף, הודעה RouteNote שמייצגת הודעה שנשלחה בנקודה מסוימת:

message RouteNote {
  // The location from which the message is sent.
  Point location = 1;

  // The message to be sent.
  string message = 2;
}

לבסוף, תצטרכו הודעה ב-RouteSummary. ההודעה הזו מתקבלת בתגובה ל-RPC‏ RecordRoute, שמוסבר בקטע הבא. הוא מכיל את מספר הנקודות הבודדות שהתקבלו, את מספר התכונות שזוהו ואת המרחק הכולל שעבר, כסכום מצטבר של המרחק בין כל נקודה.

message RouteSummary {
  // The number of points received.
  int32 point_count = 1;

  // The number of known features passed while traversing the route.
  int32 feature_count = 2;

  // The distance covered in metres.
  int32 distance = 3;

  // The duration of the traversal in seconds.
  int32 elapsed_time = 4;
}

הגדרת שיטות שירות

כדי להגדיר שירות, מציינים שירות עם שם בקובץ .proto. לקובץ route_guide.proto יש מבנה service בשם RouteGuide שמגדיר שיטה אחת או יותר שמוגדרות בשירות של האפליקציה.

כשמגדירים שיטות RPC בהגדרת השירות, מציינים את סוגי הבקשות והתגובות שלהן. בקטע הזה של ה-codelab, נגדיר:

ListFeatures

הפונקציה מחזירה את האובייקטים Feature שזמינים בתוך Rectangle הנתון. התוצאות מועברות בסטרימינג ולא מוחזרות בבת אחת, כי יכול להיות שהמלבן יכסה אזור גדול ויכיל מספר עצום של תכונות.

באפליקציה הזו, תשתמשו ב-RPC של סטרימינג בצד השרת: הלקוח שולח בקשה לשרת ומקבל סטרימינג כדי לקרוא רצף של הודעות בחזרה. הלקוח קורא מהזרם שהוחזר עד שאין יותר הודעות. כפי שאפשר לראות בדוגמה שלנו, כדי לציין שיטת סטרימינג בצד השרת, צריך להציב את מילת המפתח stream לפני סוג התגובה.

rpc ListFeatures(Rectangle) returns (stream Feature) {}

RecordRoute

מקבל פלט של נקודות במסלול שמוגדר, ומחזיר RouteSummary כשהמסלול מסתיים.

במקרה כזה, מתאים להשתמש ב-RPC של סטרימינג מצד הלקוח: הלקוח כותב רצף של הודעות ושולח אותן לשרת, שוב באמצעות סטרימינג שסופק. אחרי שהלקוח מסיים לכתוב את ההודעות, הוא מחכה שהשרת יקרא את כולן ויחזיר את התגובה שלו. כדי לציין שיטת סטרימינג בצד הלקוח, מציבים את מילת המפתח stream לפני סוג הבקשה.

rpc RecordRoute(stream Point) returns (RouteSummary) {}

RouteChat

מקבלת זרם של RouteNotes שנשלח בזמן שמתבצעת תנועה במסלול, תוך כדי קבלת RouteNotes אחרים (למשל ממשתמשים אחרים).

זה בדיוק תרחיש השימוש בסטרימינג דו-כיווני. בקשת RPC לסטרימינג דו-כיווני שבה שני הצדדים שולחים רצף של הודעות באמצעות סטרימינג לקריאה ולכתיבה. שני הזרמים פועלים באופן עצמאי, כך שהלקוחות והשרתים יכולים לקרוא ולכתוב בכל סדר שירצו: לדוגמה, השרת יכול לחכות עד שיקבל את כל ההודעות מהלקוח לפני שיכתוב את התשובות שלו, או שהוא יכול לקרוא הודעה ואז לכתוב הודעה, או כל שילוב אחר של קריאות וכתיבות. הסדר של ההודעות בכל זרם נשמר. כדי לציין את סוג ה-method הזה, צריך להוסיף את מילת המפתח stream לפני הבקשה ולפני התשובה.

rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

4. יצירת קוד לקוח וקוד שרת

עכשיו צריך ליצור את הממשקים של לקוח ושרת gRPC מהגדרת השירות .proto. אנחנו עושים את זה באמצעות מהדר מאגר אחסון לפרוטוקולים protoc עם פלאגין מיוחד של gRPC Java. כדי ליצור שירותי gRPC, צריך להשתמש בקומפיילר proto3 (שתומך בתחביר של proto2 ו-proto3).

כשמשתמשים ב-Gradle או ב-Maven, תוסף ה-build של protoc יכול ליצור את הקוד הנדרש כחלק מה-build. אפשר לעיין ב-README של grpc-java כדי ללמוד איך ליצור קוד מקובצי .proto משלכם.

סיפקנו הגדרת Gradle.

במאגר streaming-grpc-java-getting-started, מזינים

$ chmod +x gradlew
$ ./gradlew generateProto

הכיתות הבאות נוצרות מהגדרת השירות שלנו (בקטע build/generated/sources/proto/main/java):

  • אחד לכל סוג הודעה: Feature.java, Rectangle.java, .... הקבצים האלה מכילים את כל קוד מאגר אחסון לפרוטוקולים לאכלוס, לסריאליזציה ולאחזור של סוגי ההודעות של הבקשה והתגובה שלנו.
  • RouteGuideGrpc.java שמכיל (יחד עם קוד שימושי אחר) מחלקת בסיס להטמעה של שרתים מסוג RouteGuide, ‏ RouteGuideGrpc.RouteGuideImplBase, עם כל השיטות שמוגדרות בשירות RouteGuide ובמחלקות stub לשימוש של לקוחות

5. הטמעה של השירות

קודם נראה איך יוצרים RouteGuide שרת. יש שני חלקים בתהליך שבו שירות RouteGuide מבצע את העבודה שלו:

  • הטמעה של ממשק השירות שנוצר מהגדרת השירות שלנו: ביצוע ה "עבודה" בפועל של השירות שלנו.
  • הפעלת שרת gRPC להאזנה לבקשות מלקוחות ולשליחתן להטמעה הנכונה של השירות.

הטמעה של RouteGuide

אנחנו נטמיע מחלקה RouteGuideService שתהיה הרחבה של המחלקה RouteGuideGrpc.RouteGuideImplBase שנוצרה. כך ייראה היישום.

public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
        ...
}

public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {

        ...
}

public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {

        ...
}

נבחן כל הטמעה של RPC בפירוט

RPC של סטרימינג בצד השרת

עכשיו נסתכל על אחת משיטות ה-RPC של הסטרימינג. ‫ListFeatures הוא RPC של סטרימינג מצד השרת, ולכן אנחנו צריכים לשלוח בחזרה כמה Features ללקוח שלנו.

private final Collection<Feature> features;

@Override
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
  int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());
  int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());
  int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());
  int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());

  for (Feature feature : features) {
    if (!RouteGuideUtil.exists(feature)) {
      continue;
    }

    int lat = feature.getLocation().getLatitude();
    int lon = feature.getLocation().getLongitude();
    if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
      responseObserver.onNext(feature);
    }
  }
  responseObserver.onCompleted();
}

בדומה ל-RPC הפשוט, השיטה הזו מקבלת אובייקט בקשה (Rectangle שבו הלקוח רוצה למצוא את Features) ואובייקט תגובה StreamObserver.

הפעם, אנחנו מקבלים כמה אובייקטים של Feature שצריך כדי לחזור ללקוח (במקרה הזה, אנחנו בוחרים אותם מתוך אוסף התכונות של השירות, בהתאם לשאלה אם הם נמצאים בבקשה Rectangle שלנו), וכותבים כל אחד מהם בתורו למאזין התגובה באמצעות השיטה onNext() שלו. לבסוף, כמו ב-RPC הפשוט שלנו, אנחנו משתמשים בשיטה onCompleted() של אובייקט המעקב אחר התגובה כדי לציין ל-gRPC שסיימנו לכתוב תגובות.

RPC של סטרימינג מצד הלקוח

עכשיו נסתכל על משהו קצת יותר מסובך: שיטת הסטרימינג בצד הלקוח RecordRoute(), שבה אנחנו מקבלים סטרימינג של Points מהלקוח ומחזירים RouteSummary יחיד עם מידע על הנסיעה שלו.

@Override
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
  return new StreamObserver<Point>() {
    int pointCount;
    int featureCount;
    int distance;
    Point previous;
    long startTime = System.nanoTime();

    @Override
    public void onNext(Point point) {
      pointCount++;
      if (RouteGuideUtil.exists(checkFeature(point))) {
        featureCount++;
      }
      // For each point after the first, add the incremental distance from the previous point
      // to the total distance value.
      if (previous != null) {
        distance += calcDistance(previous, point);
      }
      previous = point;
    }

    @Override
    public void onError(Throwable t) {
      logger.log(Level.WARNING, "Encountered error in recordRoute", t);
    }

    @Override
    public void onCompleted() {
      long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
      responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
          .setFeatureCount(featureCount).setDistance(distance)
          .setElapsedTime((int) seconds).build());
      responseObserver.onCompleted();
    }
  };
}

כפי שאפשר לראות, כמו בסוגי השיטות הקודמים, השיטה שלנו מקבלת פרמטר StreamObserver responseObserver, אבל הפעם היא מחזירה StreamObserver כדי שהלקוח יוכל לכתוב את Points.

בגוף השיטה אנחנו יוצרים מופע של StreamObserver אנונימי כדי להחזיר, שבו:

  • אפשר לבטל את השיטה onNext() כדי לקבל תכונות ומידע אחר בכל פעם שהלקוח כותב Point לזרם ההודעות.
  • מבטלים את השיטה onCompleted() (שמופעלת כשהלקוח מסיים לכתוב הודעות) כדי לאכלס ולבנות את RouteSummary. לאחר מכן, אנחנו מפעילים את onNext() של משקיף התגובה של השיטה עם RouteSummary, ואז מפעילים את השיטה onCompleted() כדי לסיים את השיחה מצד השרת.

Bidirectional streaming RPC

לבסוף, נבחן את ה-RPC של הסטרימינג הדו-כיווני RouteChat().

@Override
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
  return new StreamObserver<RouteNote>() {
    @Override
    public void onNext(RouteNote note) {
      List<RouteNote> notes = getOrCreateNotes(note.getLocation());

      // Respond with all previous notes at this location.
      for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
        responseObserver.onNext(prevNote);
      }

      // Now add the new note to the list
      notes.add(note);
    }

    @Override
    public void onError(Throwable t) {
      logger.log(Level.WARNING, "Encountered error in routeChat", t);
    }

    @Override
    public void onCompleted() {
      responseObserver.onCompleted();
    }
  };
}

בדומה לדוגמה שלנו לסטרימינג בצד הלקוח, אנחנו מקבלים ומחזירים StreamObserver, אבל הפעם אנחנו מחזירים ערכים דרך משקיף התגובה של השיטה שלנו בזמן שהלקוח עדיין כותב הודעות לזרם ההודעות שלו. התחביר לקריאה ולכתיבה כאן זהה בדיוק לזה של שיטות הזרמת הנתונים מהלקוח ומהשרת. למרות שכל צד תמיד יקבל את ההודעות של הצד השני בסדר שבו הן נכתבו, גם הלקוח וגם השרת יכולים לקרוא ולכתוב בכל סדר – הזרמים פועלים באופן עצמאי לחלוטין.

מפעילים את השרת.

אחרי שמטמיעים את כל השיטות, צריך גם להפעיל שרת gRPC כדי שהלקוחות יוכלו להשתמש בשירות. בקטע הקוד הבא אפשר לראות איך אנחנו עושים את זה בשירות RouteGuide:

public RouteGuideServer(int port, URL featureFile) throws IOException {
  this(ServerBuilder.forPort(port), port, RouteGuideUtil.parseFeatures(featureFile));
}

/** Create a RouteGuide server using serverBuilder as a base and features as data. */
public RouteGuideServer(ServerBuilder<?> serverBuilder, int port, Collection<Feature> features) {
  this.port = port;
  server = serverBuilder.addService(new RouteGuideService(features))
      .build();
}
public void start() throws IOException {
  server.start();
  logger.info("Server started, listening on " + port);
}

כפי שאפשר לראות, אנחנו יוצרים את השרת ומתחילים להשתמש בו באמצעות ServerBuilder.

כדי לעשות את זה, אנחנו:

  1. מציינים את הכתובת והיציאה שרוצים להשתמש בהן כדי להאזין לבקשות של הלקוח באמצעות השיטה forPort() של האובייקט builder.
  2. יוצרים מופע של מחלקת הטמעת השירות RouteGuideService ומעבירים אותו לשיטת addService() של הכלי ליצירת מופעים.
  3. מפעילים את הפונקציות build() ו-start() ב-builder כדי ליצור ולהפעיל שרת RPC לשירות שלנו.

מכיוון שהיציאה כבר משולבת ב-ServerBuilder, הסיבה היחידה להעברת יציאה היא שימוש בה לצורך רישום ביומן.

6. יצירת הלקוח

בקטע הזה נראה איך ליצור לקוח לשירות RouteGuide. קוד הלקוח המלא לדוגמה מופיע ב../complete/src/main/java/io/grpc/complete/routeguide/ RouteGuideClient.java.

יצירת מופע של stub

כדי להתקשר לשיטות שירות, אנחנו צריכים קודם ליצור stub, או ליתר דיוק, שני stubs:

  • סטאב חוסם/סינכרוני: המשמעות היא שהקריאה ל-RPC ממתינה לתגובה מהשרת, ותחזיר תגובה או תעלה חריגה.
  • ‫stub לא חוסם/אסינכרוני שמבצע קריאות לא חוסמות לשרת, שבהן התגובה מוחזרת באופן אסינכרוני. אפשר לבצע סוגים מסוימים של שיחות סטרימינג רק באמצעות stub אסינכרוני.

קודם צריך ליצור ערוץ gRPC בשביל קטע הקוד החלופי (stub), ולציין את כתובת השרת והיציאה שאליהם רוצים להתחבר:

  public static void main(String[] args) throws InterruptedException {
    String target = "localhost:8980";
    if (args.length > 0) {
      if ("--help".equals(args[0])) {
        System.err.println("Usage: [target]");
        System.err.println("");
        System.err.println("  target  The server to connect to. Defaults to " + target);
        System.exit(1);
      }
      target = args[0];
    }

    List<Feature> features;
    try {
      features = RouteGuideUtil.parseFeatures(RouteGuideUtil.getDefaultFeaturesFile());
    } catch (IOException ex) {
      ex.printStackTrace();
      return;
    }

    ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create())
        .build();
    try {
      RouteGuideClient client = new RouteGuideClient(channel);

      // Looking for features between 40, -75 and 42, -73.
      client.listFeatures(400000000, -750000000, 420000000, -730000000);

      // Record a few randomly selected points from the features file.
      client.recordRoute(features, 10);

      // Send and receive some notes.
      CountDownLatch finishLatch = client.routeChat();

      if (!finishLatch.await(1, TimeUnit.MINUTES)) {
        client.warning("routeChat did not finish within 1 minutes");
      }
    } finally {
      channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
    }
  }

אנחנו משתמשים ב-ManagedChannelBuilder כדי ליצור את הערוץ.

עכשיו אפשר להשתמש בערוץ כדי ליצור את ה-stub באמצעות השיטות newStub ו-newBlockingStub שסופקו במחלקה RouteGuideGrpc שיצרנו מ-.proto.

public RouteGuideClient(Channel channel) {
    blockingStub = RouteGuideGrpc.newBlockingStub(channel);
    asyncStub = RouteGuideGrpc.newStub(channel);
  }

חשוב לזכור שאם הפעולה לא חוסמת, היא אסינכרונית

קריאה לשיטות של שירותים

עכשיו נראה איך קוראים לשיטות של השירות. שימו לב שכל קריאה לשירות מרוחק (RPC) שנוצרה מתוך stub חוסם תפעל במצב חסימה/סינכרוני, כלומר הקריאה לשירות מרוחק תמתין לתגובה מהשרת, ותחזיר תגובה או שגיאה.

RPC של סטרימינג בצד השרת

עכשיו נסתכל על קריאה לסטרימינג בצד השרת אל ListFeatures, שמחזירה סטרימינג של נתונים גיאוגרפיים Feature:

Rectangle request = Rectangle.newBuilder()
             .setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
        .setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();

Iterator<Feature> features;
try {
  features = blockingStub.listFeatures(request);
} catch (StatusRuntimeException e) {
  logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
  return;
}

כפי שאפשר לראות, הוא דומה מאוד ל-RPC unary פשוט שראינו ב-Codelab Getting_Started_With_gRPC_Java, רק שבמקום להחזיר Feature יחיד, השיטה מחזירה Iterator שהלקוח יכול להשתמש בו כדי לקרוא את כל ה-Features שהוחזרו.

RPC של סטרימינג מצד הלקוח

עכשיו נסביר על שיטה קצת יותר מורכבת: שיטת הסטרימינג בצד הלקוח RecordRoute, שבה אנחנו שולחים לשרת זרם של Points ומקבלים בחזרה RouteSummary יחיד. כדי להשתמש בשיטה הזו, צריך להשתמש ב-stub אסינכרוני. אם כבר קראתם את המאמר בנושא יצירת השרת, יכול להיות שחלק מהמידע הזה ייראה לכם מוכר מאוד – הטמעה של RPCs אסינכרוניים להזרמת נתונים מתבצעת באופן דומה בשני הצדדים.

public void recordRoute(List<Feature> features, int numPoints) throws InterruptedException {
  info("*** RecordRoute");
  final CountDownLatch finishLatch = new CountDownLatch(1);
  StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {

    @Override
    public void onNext(RouteSummary summary) {
      info("Finished trip with {0} points. Passed {1} features. "
          + "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
          summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime());
    }

    @Override
    public void onError(Throwable t) {
      Status status = Status.fromThrowable(t);
      logger.log(Level.WARNING, "RecordRoute Failed: {0}", status);
      finishLatch.countDown();
    }

    @Override
    public void onCompleted() {
      info("Finished RecordRoute");
      finishLatch.countDown();
    }
  };

  StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
  try {
    // Send numPoints points randomly selected from the features list.
    Random rand = new Random();
    for (int i = 0; i < numPoints; ++i) {
      int index = rand.nextInt(features.size());
      Point point = features.get(index).getLocation();
      info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
          RouteGuideUtil.getLongitude(point));
      requestObserver.onNext(point);
      // Sleep for a bit before sending the next one.
      Thread.sleep(rand.nextInt(1000) + 500);
      if (finishLatch.getCount() == 0) {
        // RPC completed or errored before we finished sending.
        // Sending further requests won't error, but they will just be thrown away.
        return;
      }
    }
  } catch (RuntimeException e) {
    // Cancel RPC
    requestObserver.onError(e);
    throw e;
  }
  // Mark the end of requests
  requestObserver.onCompleted();

  // Receiving happens asynchronously
  finishLatch.await(1, TimeUnit.MINUTES);
}

כפי שאפשר לראות, כדי להפעיל את ה-method הזה צריך ליצור StreamObserver, שמטמיע ממשק מיוחד כדי שהשרת יוכל להפעיל אותו עם התגובה RouteSummary שלו. בStreamObserver שלנו אנחנו:

  • מחליפים את השיטה onNext() כדי להדפיס את המידע שמוחזר כשהשרת כותב RouteSummary לזרם ההודעות.
  • מבטלים את השיטה onCompleted() (שמופעלת כשהשרת משלים את השיחה בצד שלו) כדי לצמצם את CountDownLatch, וכך נוכל לבדוק אם השרת סיים לכתוב.

לאחר מכן מעבירים את StreamObserver לשיטת recordRoute() של ה-stub האסינכרוני ומקבלים בחזרה את רכיב הצפייה בבקשת StreamObserver כדי לכתוב את Points לשליחה לשרת. אחרי שסיימנו לכתוב נקודות, אנחנו משתמשים בשיטה onCompleted() של הכלי לבדיקת בקשות כדי להודיע ל-gRPC שסיימנו לכתוב בצד הלקוח. אחרי שמסיימים, בודקים את CountDownLatch כדי לראות אם השרת סיים את הפעולה בצד שלו.

Bidirectional streaming RPC

לבסוף, נבחן את ה-RPC של הסטרימינג הדו-כיווני RouteChat().

public CountDownLatch routeChat() {
    info("*** RouteChat");
    final CountDownLatch finishLatch = new CountDownLatch(1);
    StreamObserver<RouteNote> requestObserver =
        asyncStub.routeChat(new StreamObserver<RouteNote>() {
          @Override
          public void onNext(RouteNote note) {
            info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()
                .getLatitude(), note.getLocation().getLongitude());
          }

          @Override
          public void onError(Throwable t) {
            warning("RouteChat Failed: {0}", Status.fromThrowable(t));
            finishLatch.countDown();
          }

          @Override
          public void onCompleted() {
            info("Finished RouteChat");
            finishLatch.countDown();
          }
        });

    try {
      RouteNote[] requests =
          {newNote("First message", 0, 0), newNote("Second message", 0, 10_000_000),
              newNote("Third message", 10_000_000, 0), newNote("Fourth message", 10_000_000, 10_000_000)};

      for (RouteNote request : requests) {
        info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()
            .getLatitude(), request.getLocation().getLongitude());
        requestObserver.onNext(request);
      }
    } catch (RuntimeException e) {
      // Cancel RPC
      requestObserver.onError(e);
      throw e;
    }
    // Mark the end of requests
    requestObserver.onCompleted();

    // return the latch while receiving happens asynchronously
    return finishLatch;
  }

בדומה לדוגמה של סטרימינג בצד הלקוח, אנחנו מקבלים ומחזירים משקיף תגובה של StreamObserver, אבל הפעם אנחנו שולחים ערכים דרך משקיף התגובה של השיטה שלנו בזמן שהשרת עדיין כותב הודעות לזרם ההודעות שלו. התחביר לקריאה ולכתיבה כאן זהה בדיוק לזה של שיטת הזרמת הנתונים מהלקוח שלנו. למרות שכל צד תמיד יקבל את ההודעות של הצד השני בסדר שבו הן נכתבו, גם הלקוח וגם השרת יכולים לקרוא ולכתוב בכל סדר – הזרמים פועלים באופן עצמאי לחלוטין.

7. כדאי לנסות!

  1. מהספרייה start_here:
$ ./gradlew installDist

הקוד יעבור קומפילציה, ייארז בקובץ jar וייווצרו סקריפטים שיריצו את הדוגמה. הם ייווצרו בספרייה build/install/start_here/bin/. התסריטים הם: route-guide-server ו-route-guide-client.

צריך להפעיל את השרת לפני שמפעילים את הלקוח.

  1. מריצים את השרת:
$ ./build/install/start_here/bin/route-guide-server
  1. מריצים את הלקוח:
$ ./build/install/start_here/bin/route-guide-client

8. המאמרים הבאים