Начало работы с gRPC-Java — Потоковая передача

1. Введение

В этом практическом занятии вы будете использовать gRPC-Java для создания клиента и сервера, которые составляют основу приложения для сопоставления маршрутов, написанного на Java.

К концу этого руководства у вас будет клиент, который подключается к удаленному серверу с помощью gRPC для получения информации о характеристиках маршрута клиента, создания сводки маршрута клиента и обмена информацией о маршруте, такой как обновления трафика, с сервером и другими клиентами.

Сервис определяется в файле Protocol Buffers, который будет использоваться для генерации шаблонного кода для клиента и сервера, чтобы они могли взаимодействовать друг с другом, экономя ваше время и усилия на реализации этой функциональности.

Сгенерированный код учитывает не только сложности взаимодействия между сервером и клиентом, но и сериализацию и десериализацию данных.

Что вы узнаете

  • Как использовать Protocol Buffers для определения API сервиса.
  • Как создать клиент и сервер на основе gRPC из определения Protocol Buffers с помощью автоматической генерации кода.
  • Понимание принципов потокового взаимодействия между клиентом и сервером с использованием gRPC.

Данный практический семинар предназначен для Java-разработчиков, которые только начинают работать с gRPC или хотят освежить свои знания gRPC, а также для всех, кто заинтересован в создании распределенных систем. Предварительный опыт работы с gRPC не требуется.

2. Прежде чем начать

Предварительные требования

  • Версия JDK 24.

Получите код

Чтобы вам не пришлось начинать с нуля, в этом практическом руководстве представлен шаблон исходного кода приложения, который вы сможете доработать. Следующие шаги покажут вам, как завершить приложение, включая использование плагинов компилятора Protocol Buffer для генерации шаблонного кода gRPC.

Сначала создайте рабочую директорию codelab и перейдите в неё с помощью команды cd:

mkdir streaming-grpc-java-getting-started && cd streaming-grpc-java-getting-started

Скачайте и распакуйте CodeLab:

curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
  | tar xvz --strip-components=4 \
  grpc-codelabs-1/codelabs/grpc-java-streaming/start_here

В качестве альтернативы вы можете скачать ZIP-архив, содержащий только папку codelab, и распаковать его вручную.

Полный исходный код доступен на GitHub, если вы хотите обойтись без ввода кода вручную.

3. Определите сообщения и сервисы.

Первым шагом является определение gRPC-сервиса приложения, его RPC-метода, а также типов сообщений запроса и ответа с помощью Protocol Buffers . Ваш сервис будет предоставлять:

  • RPC-методы ListFeatures , RecordRoute и RouteChat , которые реализует сервер, а вызывает клиент.
  • Типы сообщений Point , Feature , Rectangle , RouteNote и RouteSummary представляют собой структуры данных, которыми обмениваются клиент и сервер при вызове указанных выше методов.

Протоколы Protocol Buffers обычно называются protobufs. Для получения дополнительной информации о терминологии gRPC см. раздел «Основные концепции, архитектура и жизненный цикл gRPC».

Данный RPC-метод и типы сообщений для него будут определены в файле proto/routeguide/route_guide.proto предоставленного исходного кода.

Давайте создадим файл route_guide.proto .

Поскольку в этом примере мы генерируем Java-код, мы указали параметр java_package в нашем файле .proto :

option java_package = "io.grpc.examples.routeguide";
option java_outer_classname = "RouteGuideProto";

Определите типы сообщений

В файле proto/routeguide/route_guide.proto исходного кода сначала определите тип сообщения Point . Point представляет собой пару координат широты и долготы на карте. Для этого практического задания используйте целые числа для координат:

message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

Цифры 1 и 2 — это уникальные идентификационные номера для каждого поля в структуре message .

Далее определите тип сообщения Feature . В Feature используется string поле для имени или почтового адреса объекта, расположенного в точке Point :

message Feature {
  // The name or address of the feature.
  string name = 1;

  // The point where the feature is located.
  Point location = 2;
}

Для того чтобы можно было передавать данные из нескольких точек в пределах одной области клиенту, вам потребуется сообщение Rectangle , представляющее собой прямоугольник широты и долготы, отображаемый в виде двух точек lo и hi расположенных по диагонали друг от друга:

message Rectangle {
  // One corner of the rectangle.
  Point lo = 1;

  // The other corner of the rectangle.
  Point hi = 2;
}

Также сообщение RouteNote , представляющее собой сообщение, отправленное в заданной точке:

message RouteNote {
  // The location from which the message is sent.
  Point location = 1;

  // The message to be sent.
  string message = 2;
}

Наконец, вам понадобится сообщение RouteSummary . Это сообщение принимается в ответ на RPC-запрос RecordRoute , который описан в следующем разделе. Оно содержит количество полученных отдельных точек, количество обнаруженных объектов и общее пройденное расстояние как сумма расстояний между каждой точкой.

message RouteSummary {
  // The number of points received.
  int32 point_count = 1;

  // The number of known features passed while traversing the route.
  int32 feature_count = 2;

  // The distance covered in metres.
  int32 distance = 3;

  // The duration of the traversal in seconds.
  int32 elapsed_time = 4;
}

Определите методы обслуживания

Для определения сервиса необходимо указать именованный сервис в файле .proto . Файл route_guide.proto содержит структуру service с именем RouteGuide , которая определяет один или несколько методов, предоставляемых сервисом приложения.

При определении методов RPC внутри определения сервиса вы указываете их типы запроса и ответа. В этом разделе практического занятия давайте определим:

ListFeatures

Получает доступные объекты Feature внутри заданного Rectangle . Результаты передаются потоком, а не возвращаются сразу, поскольку прямоугольник может занимать большую площадь и содержать огромное количество объектов.

Для этого приложения вы будете использовать потоковый RPC на стороне сервера : клиент отправляет запрос на сервер и получает поток для чтения последовательности сообщений. Клиент читает из возвращенного потока до тех пор, пока не закончатся сообщения. Как вы можете видеть в нашем примере, вы указываете метод потоковой передачи на стороне сервера, помещая ключевое слово stream перед типом ответа.

rpc ListFeatures(Rectangle) returns (stream Feature) {}

RecordRoute

Принимает поток точек на проходимом маршруте и возвращает RouteSummary после завершения обхода.

В данном случае подходит RPC -запрос с потоковой передачей на стороне клиента : клиент записывает последовательность сообщений и отправляет их на сервер, снова используя предоставленный поток. После завершения записи сообщений клиент ожидает, пока сервер прочтет их все и вернет свой ответ. Метод потоковой передачи на стороне клиента указывается путем добавления ключевого слова stream перед типом запроса.

rpc RecordRoute(stream Point) returns (RouteSummary) {}

RouteChat

Принимает поток RouteNotes , отправляемых во время прохождения маршрута, одновременно получая другие RouteNotes (например, от других пользователей).

Это именно тот случай, когда может пригодиться двунаправленная потоковая передача . Двунаправленный потоковый RPC-вызов, в котором обе стороны отправляют последовательность сообщений, используя поток чтения и записи. Два потока работают независимо, поэтому клиенты и серверы могут читать и записывать в любом порядке: например, сервер может дождаться получения всех сообщений от клиентов, прежде чем записывать свои ответы, или он может поочередно читать сообщение, а затем записывать сообщение, или использовать какую-либо другую комбинацию чтения и записи. Порядок сообщений в каждом потоке сохраняется. Этот тип метода указывается путем размещения ключевого слова stream перед запросом и ответом.

rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

4. Сгенерируйте код клиента и сервера.

Далее нам необходимо сгенерировать интерфейсы gRPC-клиента и сервера из определения сервиса в файле .proto . Мы делаем это с помощью компилятора Protocol Buffer protoc со специальным плагином gRPC для Java. Для генерации gRPC-сервисов необходимо использовать компилятор proto3 (который поддерживает синтаксис как proto2, так и proto3).

При использовании Gradle или Maven плагин сборки protoc может генерировать необходимый код в процессе сборки. Инструкции по генерации кода из собственных .proto файлов можно найти в файле README плагина grpc-java .

Мы предоставили конфигурацию Gradle.

Из каталога streaming-grpc-java-getting-started перейдите по ссылке:

$ chmod +x gradlew
$ ./gradlew generateProto

Следующие классы генерируются из определения нашего сервиса (в папке build/generated/sources/proto/main/java ):

  • Для каждого типа сообщений существует отдельный файл: Feature.java , Rectangle.java, ... содержащие весь код протокола Protocol Buffer для заполнения, сериализации и получения сообщений типа запроса и ответа.
  • Файл RouteGuideGrpc.java содержит (а также некоторый другой полезный код) базовый класс для реализации серверами RouteGuide , RouteGuideGrpc.RouteGuideImplBase , со всеми методами, определенными в сервисе RouteGuide , и заглушками для использования клиентами.

5. Внедрить сервис.

Для начала давайте рассмотрим, как создать сервер RouteGuide . Для того, чтобы наш сервис RouteGuide выполнял свою работу, необходимо выполнить две задачи:

  • Реализация интерфейса сервиса, сгенерированного на основе определения нашего сервиса: выполнение фактической «работы» нашего сервиса.
  • Запуск gRPC-сервера для приема запросов от клиентов и их перенаправления к соответствующей реализации сервиса.

Внедрить RouteGuide

Мы реализуем класс RouteGuideService , который будет наследовать сгенерированный класс RouteGuideGrpc.RouteGuideImplBase. Вот как будет выглядеть реализация.

public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
        ...
}

public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {

        ...
}

public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {

        ...
}

Давайте подробно рассмотрим каждую реализацию RPC.

Серверная потоковая передача RPC

Теперь давайте рассмотрим один из наших потоковых RPC-вызовов. ListFeatures — это потоковый RPC-вызов на стороне сервера, поэтому нам нужно отправить клиенту несколько Features .

private final Collection<Feature> features;

@Override
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
  int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());
  int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());
  int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());
  int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());

  for (Feature feature : features) {
    if (!RouteGuideUtil.exists(feature)) {
      continue;
    }

    int lat = feature.getLocation().getLatitude();
    int lon = feature.getLocation().getLongitude();
    if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
      responseObserver.onNext(feature);
    }
  }
  responseObserver.onCompleted();
}

Подобно простому RPC, этот метод получает объект запроса ( Rectangle в котором наш клиент хочет найти Features ) и наблюдатель ответа StreamObserver .

На этот раз мы получаем столько объектов Feature , сколько необходимо вернуть клиенту (в данном случае мы выбираем их из коллекции объектов сервиса в зависимости от того, находятся ли они внутри нашего Rectangle запроса), и по очереди записываем каждый из них в наблюдатель ответа, используя его метод onNext() . Наконец, как и в нашем простом RPC, мы используем метод onCompleted() наблюдателя ответа, чтобы сообщить gRPC, что мы закончили запись ответов.

RPC потоковой передачи на стороне клиента

Теперь давайте рассмотрим нечто немного более сложное: метод потоковой передачи данных на стороне клиента RecordRoute() , где мы получаем поток Points от клиента и возвращаем один RouteSummary с информацией о поездке.

@Override
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
  return new StreamObserver<Point>() {
    int pointCount;
    int featureCount;
    int distance;
    Point previous;
    long startTime = System.nanoTime();

    @Override
    public void onNext(Point point) {
      pointCount++;
      if (RouteGuideUtil.exists(checkFeature(point))) {
        featureCount++;
      }
      // For each point after the first, add the incremental distance from the previous point
      // to the total distance value.
      if (previous != null) {
        distance += calcDistance(previous, point);
      }
      previous = point;
    }

    @Override
    public void onError(Throwable t) {
      logger.log(Level.WARNING, "Encountered error in recordRoute", t);
    }

    @Override
    public void onCompleted() {
      long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
      responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
          .setFeatureCount(featureCount).setDistance(distance)
          .setElapsedTime((int) seconds).build());
      responseObserver.onCompleted();
    }
  };
}

Как видите, подобно предыдущим типам методов, наш метод получает параметр StreamObserver responseObserver , но на этот раз он возвращает StreamObserver , чтобы клиент мог записывать свои Points .

В теле метода мы создаём анонимный StreamObserver который возвращает значение, и в этом случае мы:

  • Переопределите метод onNext() , чтобы получать информацию о характеристиках и другие данные каждый раз, когда клиент записывает Point в поток сообщений.
  • Переопределите метод onCompleted() (вызываемый после завершения записи сообщений клиентом ), чтобы заполнить и построить наш RouteSummary . Затем вызовите метод onNext() наблюдателя ответа нашего метода с нашим RouteSummary , а затем вызовите его метод onCompleted() , чтобы завершить вызов на стороне сервера.

Двунаправленный потоковый RPC

Наконец, давайте рассмотрим наш двунаправленный потоковый RPC RouteChat() .

@Override
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
  return new StreamObserver<RouteNote>() {
    @Override
    public void onNext(RouteNote note) {
      List<RouteNote> notes = getOrCreateNotes(note.getLocation());

      // Respond with all previous notes at this location.
      for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
        responseObserver.onNext(prevNote);
      }

      // Now add the new note to the list
      notes.add(note);
    }

    @Override
    public void onError(Throwable t) {
      logger.log(Level.WARNING, "Encountered error in routeChat", t);
    }

    @Override
    public void onCompleted() {
      responseObserver.onCompleted();
    }
  };
}

Как и в нашем примере с потоковой передачей на стороне клиента, мы получаем и возвращаем StreamObserver , за исключением того, что на этот раз мы возвращаем значения через наблюдатель ответа нашего метода, пока клиент продолжает записывать сообщения в свой поток сообщений. Синтаксис для чтения и записи здесь точно такой же, как и для методов потоковой передачи на стороне клиента и сервера. Хотя каждая сторона всегда будет получать сообщения другой стороны в том порядке, в котором они были записаны, и клиент, и сервер могут читать и записывать в любом порядке — потоки работают полностью независимо.

Запустите сервер

После реализации всех наших методов нам также необходимо запустить gRPC-сервер, чтобы клиенты могли фактически использовать наш сервис. Следующий фрагмент кода показывает, как мы это делаем для нашего сервиса RouteGuide :

public RouteGuideServer(int port, URL featureFile) throws IOException {
  this(ServerBuilder.forPort(port), port, RouteGuideUtil.parseFeatures(featureFile));
}

/** Create a RouteGuide server using serverBuilder as a base and features as data. */
public RouteGuideServer(ServerBuilder<?> serverBuilder, int port, Collection<Feature> features) {
  this.port = port;
  server = serverBuilder.addService(new RouteGuideService(features))
      .build();
}
public void start() throws IOException {
  server.start();
  logger.info("Server started, listening on " + port);
}

Как видите, мы создаём и запускаем наш сервер с помощью ServerBuilder .

Для этого мы:

  1. Укажите адрес и порт, которые мы хотим использовать для прослушивания запросов клиентов, с помощью метода forPort() конструктора.
  2. Создайте экземпляр класса реализации нашего сервиса RouteGuideService и передайте его методу addService() построителя.
  3. Вызовите build() и start() в построителе, чтобы создать и запустить RPC-сервер для нашего сервиса.

Поскольку ServerBuilder уже учитывает порт, единственная причина, по которой мы передаем порт, — это использование его для ведения журналов.

6. Создайте клиента.

В этом разделе мы рассмотрим создание клиента для нашего сервиса RouteGuide . Полный пример кода клиента можно найти в RouteGuideClient.java ../complete/src/main/java/io/grpc/complete/routeguide/ .

Создать экземпляр заглушки

Для вызова методов сервиса нам сначала нужно создать заглушку , или, точнее, две заглушки:

  • Блокирующий/синхронный заглушка: это означает, что вызов RPC ожидает ответа от сервера и либо вернет ответ, либо вызовет исключение.
  • Асинхронный/неблокирующий заглушка, выполняющий неблокирующие вызовы к серверу, при этом ответ возвращается асинхронно. Некоторые типы потоковых вызовов можно выполнять только с помощью асинхронной заглушки.

Сначала нам нужно создать gRPC- канал для нашего заглушки, указав адрес сервера и порт, к которому мы хотим подключиться:

  public static void main(String[] args) throws InterruptedException {
    String target = "localhost:8980";
    if (args.length > 0) {
      if ("--help".equals(args[0])) {
        System.err.println("Usage: [target]");
        System.err.println("");
        System.err.println("  target  The server to connect to. Defaults to " + target);
        System.exit(1);
      }
      target = args[0];
    }

    List<Feature> features;
    try {
      features = RouteGuideUtil.parseFeatures(RouteGuideUtil.getDefaultFeaturesFile());
    } catch (IOException ex) {
      ex.printStackTrace();
      return;
    }

    ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create())
        .build();
    try {
      RouteGuideClient client = new RouteGuideClient(channel);

      // Looking for features between 40, -75 and 42, -73.
      client.listFeatures(400000000, -750000000, 420000000, -730000000);

      // Record a few randomly selected points from the features file.
      client.recordRoute(features, 10);

      // Send and receive some notes.
      CountDownLatch finishLatch = client.routeChat();

      if (!finishLatch.await(1, TimeUnit.MINUTES)) {
        client.warning("routeChat did not finish within 1 minutes");
      }
    } finally {
      channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
    }
  }

Для создания канала мы используем ManagedChannelBuilder .

Теперь мы можем использовать канал для создания наших заглушек, используя методы newStub и newBlockingStub , предоставленные в классе RouteGuideGrpc , который мы сгенерировали из нашего .proto .

public RouteGuideClient(Channel channel) {
    blockingStub = RouteGuideGrpc.newBlockingStub(channel);
    asyncStub = RouteGuideGrpc.newStub(channel);
  }

Помните, если это не блокирующий процесс, значит, он асинхронный.

Методы вызова сервиса

Теперь давайте посмотрим, как мы вызываем методы нашего сервиса. Обратите внимание, что любые RPC-вызовы, созданные из блокирующего заглушки, будут работать в блокирующем/синхронном режиме, что означает, что RPC-вызов ожидает ответа от сервера и либо вернет ответ, либо ошибку.

Серверная потоковая передача RPC

Далее рассмотрим вызов функции ListFeatures на стороне сервера, который возвращает поток географических Feature :

Rectangle request = Rectangle.newBuilder()
             .setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
        .setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();

Iterator<Feature> features;
try {
  features = blockingStub.listFeatures(request);
} catch (StatusRuntimeException e) {
  logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
  return;
}

Как видите, это очень похоже на простой унарный RPC, который мы рассматривали в практическом руководстве Getting_Started_With_gRPC_Java, за исключением того, что вместо возврата одного Feature , метод возвращает Iterator , который клиент может использовать для чтения всех возвращенных Features .

RPC потоковой передачи на стороне клиента

Теперь перейдём к чему-то немного более сложному: методу потоковой передачи на стороне клиента RecordRoute , где мы отправляем поток Points на сервер и получаем в ответ один RouteSummary . Для этого метода нам необходимо использовать асинхронный заглушечный метод. Если вы уже читали статью «Создание сервера» , некоторые моменты могут показаться вам очень знакомыми — асинхронные потоковые RPC-вызовы реализованы аналогичным образом с обеих сторон.

public void recordRoute(List<Feature> features, int numPoints) throws InterruptedException {
  info("*** RecordRoute");
  final CountDownLatch finishLatch = new CountDownLatch(1);
  StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {

    @Override
    public void onNext(RouteSummary summary) {
      info("Finished trip with {0} points. Passed {1} features. "
          + "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
          summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime());
    }

    @Override
    public void onError(Throwable t) {
      Status status = Status.fromThrowable(t);
      logger.log(Level.WARNING, "RecordRoute Failed: {0}", status);
      finishLatch.countDown();
    }

    @Override
    public void onCompleted() {
      info("Finished RecordRoute");
      finishLatch.countDown();
    }
  };

  StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
  try {
    // Send numPoints points randomly selected from the features list.
    Random rand = new Random();
    for (int i = 0; i < numPoints; ++i) {
      int index = rand.nextInt(features.size());
      Point point = features.get(index).getLocation();
      info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
          RouteGuideUtil.getLongitude(point));
      requestObserver.onNext(point);
      // Sleep for a bit before sending the next one.
      Thread.sleep(rand.nextInt(1000) + 500);
      if (finishLatch.getCount() == 0) {
        // RPC completed or errored before we finished sending.
        // Sending further requests won't error, but they will just be thrown away.
        return;
      }
    }
  } catch (RuntimeException e) {
    // Cancel RPC
    requestObserver.onError(e);
    throw e;
  }
  // Mark the end of requests
  requestObserver.onCompleted();

  // Receiving happens asynchronously
  finishLatch.await(1, TimeUnit.MINUTES);
}

Как видите, для вызова этого метода нам необходимо создать StreamObserver , который реализует специальный интерфейс, позволяющий серверу вызывать его с ответом RouteSummary . В нашем StreamObserver мы:

  • Переопределите метод onNext() , чтобы выводить возвращаемую информацию, когда сервер записывает RouteSummary в поток сообщений.
  • Переопределите метод onCompleted() (вызываемый, когда сервер завершает вызов на своей стороне), чтобы уменьшить количество CountDownLatch и проверить, завершил ли сервер запись.

Затем мы передаем StreamObserver методу recordRoute() асинхронного заглушки и получаем обратно наш собственный наблюдатель запросов StreamObserver для записи Points , которые будут отправлены на сервер. После завершения записи точек мы используем метод onCompleted() наблюдателя запросов, чтобы сообщить gRPC, что запись завершена на стороне клиента. После этого мы проверяем наш CountDownLatch , чтобы узнать, завершил ли сервер свою работу.

Двунаправленный потоковый RPC

Наконец, давайте рассмотрим наш двунаправленный потоковый RPC RouteChat() .

public CountDownLatch routeChat() {
    info("*** RouteChat");
    final CountDownLatch finishLatch = new CountDownLatch(1);
    StreamObserver<RouteNote> requestObserver =
        asyncStub.routeChat(new StreamObserver<RouteNote>() {
          @Override
          public void onNext(RouteNote note) {
            info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()
                .getLatitude(), note.getLocation().getLongitude());
          }

          @Override
          public void onError(Throwable t) {
            warning("RouteChat Failed: {0}", Status.fromThrowable(t));
            finishLatch.countDown();
          }

          @Override
          public void onCompleted() {
            info("Finished RouteChat");
            finishLatch.countDown();
          }
        });

    try {
      RouteNote[] requests =
          {newNote("First message", 0, 0), newNote("Second message", 0, 10_000_000),
              newNote("Third message", 10_000_000, 0), newNote("Fourth message", 10_000_000, 10_000_000)};

      for (RouteNote request : requests) {
        info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()
            .getLatitude(), request.getLocation().getLongitude());
        requestObserver.onNext(request);
      }
    } catch (RuntimeException e) {
      // Cancel RPC
      requestObserver.onError(e);
      throw e;
    }
    // Mark the end of requests
    requestObserver.onCompleted();

    // return the latch while receiving happens asynchronously
    return finishLatch;
  }

Как и в нашем примере с потоковой передачей на стороне клиента, мы получаем и возвращаем наблюдатель ответа StreamObserver , за исключением того, что на этот раз мы отправляем значения через наблюдатель ответа нашего метода, пока сервер еще записывает сообщения в свой поток сообщений. Синтаксис для чтения и записи здесь точно такой же, как и для нашего метода потоковой передачи на стороне клиента. Хотя каждая сторона всегда будет получать сообщения другой стороны в том порядке, в котором они были записаны, и клиент, и сервер могут читать и записывать в любом порядке — потоки работают полностью независимо.

7. Попробуйте!

  1. Из каталога start_here :
$ ./gradlew installDist

Это скомпилирует ваш код, упакует его в JAR-файл и создаст скрипты для запуска примера. Они будут созданы в каталоге build/install/start_here/bin/ . Скрипты называются: route-guide-server и route-guide-client .

Перед запуском клиента необходимо, чтобы сервер был запущен.

  1. Запустите сервер:
$ ./build/install/start_here/bin/route-guide-server
  1. Запустите клиент:
$ ./build/install/start_here/bin/route-guide-client

8. Что дальше?