Getting Started with gRPC-Java - Streaming

1. مقدمة

في هذا الدرس التطبيقي حول الترميز، ستستخدم gRPC-Java لإنشاء عميل وخادم يشكّلان الأساس لتطبيق يحدّد المسارات مكتوب بلغة Java.

في نهاية هذا الدليل التعليمي، سيكون لديك عميل يتصل بخادم بعيد باستخدام gRPC للحصول على معلومات حول الميزات على مسار العميل، وإنشاء ملخّص لمسار العميل، وتبادل معلومات المسار، مثل آخر الأخبار عن حركة المرور، مع الخادم والعملاء الآخرين.

يتم تحديد الخدمة في ملف بتنسيق Protocol Buffers، وسيتم استخدام هذا الملف لإنشاء رمز نص نموذجي للعميل والخادم حتى يتمكّنا من التواصل مع بعضهما البعض، ما يوفّر عليك الوقت والجهد في تنفيذ هذه الوظيفة.

لا يهتم هذا الرمز الذي تم إنشاؤه بتعقيدات الاتصال بين الخادم والعميل فحسب، بل أيضًا بتسلسل البيانات وإلغاء تسلسلها.

ماذا ستتعلّم؟

  • كيفية استخدام "مخازن البروتوكولات المؤقتة" (Protocol Buffers) لتحديد واجهة برمجة تطبيقات الخدمة
  • كيفية إنشاء عميل وخادم يستندان إلى gRPC استنادًا إلى "مخازن البروتوكولات المؤقتة" من خلال إنشاء الرموز البرمجية آليًا
  • فهم عملية التواصل بين العميل والخادم باستخدام gRPC

هذا الدرس التطبيقي حول الترميز موجّه لمطوّري Java المبتدئين في gRPC أو الذين يريدون تجديد معلوماتهم في المجال، أو أي شخص آخر مهتم بتطوير أنظمة موزّعة. لا يُشترط توفّر خبرة سابقة في gRPC.

2. قبل البدء

المتطلبات الأساسية

  • الإصدار 24 من JDK

الحصول على الشفرة‏

كي لا تضطر إلى البدء من الصفر تمامًا، يوفّر لك هذا الدرس التطبيقي حول الترميز بنية أساسية للرمز المصدر الخاص بالتطبيق لتتمكّن من إكماله. ستوضّح لك الخطوات التالية كيفية إكمال التطبيق، بما في ذلك استخدام مكوّنات برنامج تجميع مخازن البروتوكولات المؤقتة لإنشاء رمز gRPC النموذجي.

أولاً، أنشئ دليل عمل الدرس التطبيقي وادخله:

mkdir streaming-grpc-java-getting-started && cd streaming-grpc-java-getting-started

نزِّل الدرس التطبيقي حول الترميز واستخرِجه:

curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
  | tar xvz --strip-components=4 \
  grpc-codelabs-1/codelabs/grpc-java-streaming/start_here

بدلاً من ذلك، يمكنك تنزيل ملف ‎ .zip الذي يحتوي على دليل الدرس العملي فقط وفك ضغطه يدويًا.

يتوفّر الرمز المصدر المكتمل على GitHub إذا كنت تريد تخطّي كتابة عملية التنفيذ.

3- تحديد الرسائل والخدمات

تتمثّل خطوتك الأولى في تحديد خدمة gRPC للتطبيق وطريقة استدعاء إجراء عن بُعد (RPC) وأنواع رسائل الطلبات والردود باستخدام مخازن البروتوكولات المؤقتة. ستوفّر خدمتك ما يلي:

  • طُرق استدعاء الإجراء عن بُعد التي تحمل الأسماء ListFeatures وRecordRoute وRouteChat والتي ينفّذها الخادم ويستدعيها العميل
  • أنواع الرسائل Point وFeature وRectangle وRouteNote وRouteSummary، وهي عبارة عن بنى بيانات يتم تبادلها بين العميل والخادم عند استدعاء الطرق المذكورة أعلاه.

يُشار إلى "مخازن البروتوكولات المؤقتة" عادةً باسم protobufs. لمزيد من المعلومات عن مصطلحات gRPC، يُرجى الاطّلاع على المفاهيم الأساسية والبنية ودورة الحياة في gRPC.

سيتم تحديد طريقة "استدعاء الإجراء عن بُعد" هذه وأنواع الرسائل الخاصة بها في ملف proto/routeguide/route_guide.proto الخاص بالرمز المصدر المقدَّم.

لننشئ ملف route_guide.proto.

بما أنّنا ننشئ رمز Java في هذا المثال، حدّدنا خيار الملف java_package في .proto:

option java_package = "io.grpc.examples.routeguide";
option java_outer_classname = "RouteGuideProto";

تحديد أنواع الرسائل

في ملف proto/routeguide/route_guide.proto الخاص بالرمز المصدر، حدِّد أولاً نوع رسالة Point. يمثّل Point زوج إحداثيات خط العرض وخط الطول على الخريطة. في هذا الدرس التطبيقي، استخدِم أعدادًا صحيحة للإحداثيات:

message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

الرقمان 1 و2 هما رقما تعريف فريدان لكل حقل من الحقول في بنية message.

بعد ذلك، حدِّد نوع رسالة Feature. يستخدم Feature الحقل string للاسم أو العنوان البريدي الخاص بمكان محدّد من خلال Point:

message Feature {
  // The name or address of the feature.
  string name = 1;

  // The point where the feature is located.
  Point location = 2;
}

لكي يتم بث نقاط متعددة ضمن منطقة إلى أحد العملاء، ستحتاج إلى رسالة Rectangle تمثّل مستطيلاً لخطوط الطول والعرض، ويتم تمثيله كنقطتَين متقابلتَين قطريًا lo وhi:

message Rectangle {
  // One corner of the rectangle.
  Point lo = 1;

  // The other corner of the rectangle.
  Point hi = 2;
}

بالإضافة إلى ذلك، رسالة RouteNote تمثّل رسالة تم إرسالها في وقت محدّد:

message RouteNote {
  // The location from which the message is sent.
  Point location = 1;

  // The message to be sent.
  string message = 2;
}

أخيرًا، ستحتاج إلى رسالة RouteSummary. يتم تلقّي هذه الرسالة ردًا على طلب إجراء RecordRoute RPC، ويتم توضيح ذلك في القسم التالي. يحتوي هذا الملف على عدد النقاط الفردية التي تم تلقّيها، وعدد الميزات التي تم رصدها، وإجمالي المسافة المقطوعة كمجموع تراكمي للمسافة بين كل نقطة.

message RouteSummary {
  // The number of points received.
  int32 point_count = 1;

  // The number of known features passed while traversing the route.
  int32 feature_count = 2;

  // The distance covered in metres.
  int32 distance = 3;

  // The duration of the traversal in seconds.
  int32 elapsed_time = 4;
}

تحديد طرق الخدمة

لتحديد خدمة، عليك تحديد خدمة مسماة في ملف .proto. يحتوي الملف route_guide.proto على بنية service باسم RouteGuide تحدّد طريقة واحدة أو أكثر توفّرها خدمة التطبيق.

عند تحديد طرق RPC داخل تعريف الخدمة، عليك تحديد أنواع الطلبات والردود. في هذا القسم من الدرس العملي، لنحدّد ما يلي:

ListFeatures

يحصل على عناصر Feature المتوفّرة ضمن Rectangle المحدّد. يتم بث النتائج بدلاً من عرضها دفعة واحدة لأنّ المستطيل قد يغطي مساحة كبيرة ويحتوي على عدد كبير من العناصر.

بالنسبة إلى هذا التطبيق، ستستخدم عملية بث من جهة الخادم لاستدعاء إجراء عن بُعد (RPC): يرسل العميل طلبًا إلى الخادم ويتلقّى بثًا لقراءة سلسلة من الرسائل. يقرأ العميل من البث الذي تم إرجاعه إلى أن لا تتبقى أي رسائل. كما هو موضّح في مثالنا، يمكنك تحديد طريقة البث من جهة الخادم من خلال وضع الكلمة الرئيسية stream قبل نوع الاستجابة.

rpc ListFeatures(Rectangle) returns (stream Feature) {}

RecordRoute

تقبل هذه السمة مجموعة من النقاط على مسار يتم اجتيازه، وتعرض RouteSummary عند اكتمال عملية الاجتياز.

في هذه الحالة، يكون طلب إجراء مكالمة RPC ببث من جهة العميل مناسبًا: يكتب العميل سلسلة من الرسائل ويرسلها إلى الخادم، وذلك باستخدام بث يتم توفيره مرة أخرى. بعد أن ينتهي العميل من كتابة الرسائل، ينتظر أن يقرأها الخادم كلها ويرسل رده. يمكنك تحديد طريقة البث من جهة العميل عن طريق وضع الكلمة الرئيسية stream قبل نوع الطلب.

rpc RecordRoute(stream Point) returns (RouteSummary) {}

RouteChat

يقبل هذا النوع من الطلبات بثًا من RouteNotes يتم إرساله أثناء التنقّل في مسار، مع تلقّي RouteNotes أخرى (مثل تلك الواردة من مستخدمين آخرين).

هذه هي بالضبط حالة الاستخدام للبث الثنائي الاتجاه. هي استدعاء إجراء عن بُعد (RPC) لبث ثنائي الاتجاه، حيث يرسل كلا الجانبين سلسلة من الرسائل باستخدام بث للقراءة والكتابة. يعمل كل من هذين التدفقين بشكل مستقل، لذا يمكن للعملاء والخوادم القراءة والكتابة بأي ترتيب يريدونه: على سبيل المثال، يمكن للخادم الانتظار لتلقّي جميع رسائل العميل قبل كتابة ردوده، أو يمكنه بدلاً من ذلك قراءة رسالة ثم كتابة رسالة، أو أي مجموعة أخرى من عمليات القراءة والكتابة. يتم الحفاظ على ترتيب الرسائل في كل بث. يمكنك تحديد هذا النوع من الطرق من خلال وضع الكلمة الرئيسية stream قبل كلّ من الطلب والاستجابة.

rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

4. إنشاء رمز العميل والخادم

بعد ذلك، علينا إنشاء واجهات عميل وخادم gRPC من تعريف الخدمة .proto. وننفّذ ذلك باستخدام برنامج تجميع Protocol Buffers protoc مع مكوّن إضافي خاص بـ gRPC Java. عليك استخدام برنامج تجميع proto3 (الذي يتوافق مع بنية proto2 وproto3) لإنشاء خدمات gRPC.

عند استخدام Gradle أو Maven، يمكن لمكوّن protoc الإضافي للإنشاء إنشاء الرمز البرمجي اللازم كجزء من عملية الإنشاء. يمكنك الرجوع إلى ملف README الخاص بـ grpc-java لمعرفة كيفية إنشاء الرمز من ملفات .proto الخاصة بك.

لقد قدّمنا إعدادات Gradle.

من دليل streaming-grpc-java-getting-started، أدخِل

$ chmod +x gradlew
$ ./gradlew generateProto

يتم إنشاء الفئات التالية من تعريف الخدمة (ضمن build/generated/sources/proto/main/java):

  • ملف واحد لكل نوع رسالة: Feature.java وRectangle.java, ...، يحتويان على كل رمز Protocol Buffers اللازم لتعبئة أنواع رسائل الطلبات والردود ونشرها على نحو متسلسِل واستردادها.
  • RouteGuideGrpc.java الذي يحتوي (بالإضافة إلى بعض الرموز المفيدة الأخرى) على صنف أساسي للخوادم RouteGuide لتنفيذه، RouteGuideGrpc.RouteGuideImplBase، مع جميع الطرق المحددة في خدمة RouteGuide وفئات الرمز البديل للعملاء لاستخدامها

5. تنفيذ الخدمة

لنلقِ نظرة أولاً على كيفية إنشاء خادم RouteGuide. يتضمّن إنجاح خدمة RouteGuide جزأين:

  • تنفيذ واجهة الخدمة التي تم إنشاؤها من تعريف الخدمة: تنفيذ "العمل" الفعلي لخدمتنا
  • تشغيل خادم gRPC للاستماع إلى الطلبات من العملاء وإرسالها إلى التنفيذ الصحيح للخدمة

تنفيذ RouteGuide

سننفّذ فئة RouteGuideService ستوسّع فئة RouteGuideGrpc.RouteGuideImplBase التي تم إنشاؤها. إليك طريقة التنفيذ.

public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
        ...
}

public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {

        ...
}

public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {

        ...
}

لنلقِ نظرة على كل عملية تنفيذ لطلب إجراء عن بُعد بالتفصيل.

استدعاء إجراء عن بُعد (RPC) للبث من جهة الخادم

لنلقِ نظرة الآن على أحد إجراءات RPC الخاصة بالبث. ‫ListFeatures هو إجراء عن بُعد (RPC) لبث البيانات من جهة الخادم، لذا علينا إرسال عدة Features إلى العميل.

private final Collection<Feature> features;

@Override
public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
  int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());
  int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());
  int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());
  int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());

  for (Feature feature : features) {
    if (!RouteGuideUtil.exists(feature)) {
      continue;
    }

    int lat = feature.getLocation().getLatitude();
    int lon = feature.getLocation().getLongitude();
    if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
      responseObserver.onNext(feature);
    }
  }
  responseObserver.onCompleted();
}

كما هو الحال في عملية استدعاء الإجراء عن بُعد البسيطة، تحصل هذه الطريقة على كائن طلب (Rectangle الذي يريد العميل العثور على Features فيه) ومراقب ردّ StreamObserver.

في هذه المرة، نحصل على أكبر عدد ممكن من عناصر Feature التي نحتاج إلى إرجاعها إلى العميل (في هذه الحالة، نختارها من مجموعة ميزات الخدمة استنادًا إلى ما إذا كانت داخل طلب Rectangle أم لا)، ونكتبها بالتناوب في مراقب الاستجابة باستخدام طريقة onNext(). أخيرًا، كما هو الحال في إجراء RPC البسيط، نستخدم الطريقة onCompleted() الخاصة بمراقب الردود لإعلام gRPC بأنّنا انتهينا من كتابة الردود.

استدعاء إجراء عن بُعد للبث من جهة العميل

لنلقِ نظرة الآن على طريقة أكثر تعقيدًا: طريقة البث من جهة العميل RecordRoute()، حيث نحصل على بث من Points من العميل ونعرض RouteSummary واحدًا يتضمّن معلومات حول رحلته.

@Override
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
  return new StreamObserver<Point>() {
    int pointCount;
    int featureCount;
    int distance;
    Point previous;
    long startTime = System.nanoTime();

    @Override
    public void onNext(Point point) {
      pointCount++;
      if (RouteGuideUtil.exists(checkFeature(point))) {
        featureCount++;
      }
      // For each point after the first, add the incremental distance from the previous point
      // to the total distance value.
      if (previous != null) {
        distance += calcDistance(previous, point);
      }
      previous = point;
    }

    @Override
    public void onError(Throwable t) {
      logger.log(Level.WARNING, "Encountered error in recordRoute", t);
    }

    @Override
    public void onCompleted() {
      long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
      responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
          .setFeatureCount(featureCount).setDistance(distance)
          .setElapsedTime((int) seconds).build());
      responseObserver.onCompleted();
    }
  };
}

كما ترى، مثل أنواع الطرق السابقة، تتلقّى طريقتنا المَعلمة StreamObserver responseObserver، ولكنها تعرض هذه المرة StreamObserver لكي يكتب العميل Points.

في نص الدالة، ننشئ StreamObserver مجهول الاسم لعرضه، حيث:

  • يمكنك إلغاء طريقة onNext() للحصول على الميزات والمعلومات الأخرى في كل مرة يكتب فيها العميل Point في بث الرسائل.
  • تجاوز طريقة onCompleted() (يتم استدعاؤها عندما ينتهي العميل من كتابة الرسائل) لملء وإنشاء RouteSummary. بعد ذلك، نستدعي onNext() في مراقب الردود الخاص بطريقتنا مع RouteSummary، ثم نستدعي الطريقة onCompleted() لإنهاء المكالمة من جهة الخادم.

استدعاء إجراء عن بُعد (RPC) باستخدام البث الثنائي الاتجاه

أخيرًا، لنلقِ نظرة على استدعاء الإجراء عن بُعد (RPC) RouteChat() الذي يتيح البث الثنائي الاتجاه.

@Override
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
  return new StreamObserver<RouteNote>() {
    @Override
    public void onNext(RouteNote note) {
      List<RouteNote> notes = getOrCreateNotes(note.getLocation());

      // Respond with all previous notes at this location.
      for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
        responseObserver.onNext(prevNote);
      }

      // Now add the new note to the list
      notes.add(note);
    }

    @Override
    public void onError(Throwable t) {
      logger.log(Level.WARNING, "Encountered error in routeChat", t);
    }

    @Override
    public void onCompleted() {
      responseObserver.onCompleted();
    }
  };
}

كما هو الحال مع مثال البث من جهة العميل، نحصل على StreamObserver ونعرضه، ولكن هذه المرة نعرض القيم من خلال مراقب الردّ الخاص بالطريقة بينما لا يزال العميل يكتب الرسائل إلى بث الرسائل الخاص به. إنّ بنية القراءة والكتابة هنا هي نفسها تمامًا كما هو الحال في طريقتَي البث من الخادم والبث من العميل. على الرغم من أنّ كل طرف سيتلقّى دائمًا رسائل الطرف الآخر بالترتيب الذي تمت كتابتها به، يمكن لكل من العميل والخادم القراءة والكتابة بأي ترتيب، إذ تعمل عمليات البث بشكل مستقل تمامًا.

ابدأ الخادم

بعد تنفيذ جميع طرقنا، نحتاج أيضًا إلى بدء تشغيل خادم gRPC حتى يتمكّن العملاء من استخدام خدمتنا. يوضّح المقتطف التالي كيفية إجراء ذلك في خدمة RouteGuide:

public RouteGuideServer(int port, URL featureFile) throws IOException {
  this(ServerBuilder.forPort(port), port, RouteGuideUtil.parseFeatures(featureFile));
}

/** Create a RouteGuide server using serverBuilder as a base and features as data. */
public RouteGuideServer(ServerBuilder<?> serverBuilder, int port, Collection<Feature> features) {
  this.port = port;
  server = serverBuilder.addService(new RouteGuideService(features))
      .build();
}
public void start() throws IOException {
  server.start();
  logger.info("Server started, listening on " + port);
}

كما ترى، ننشئ خادمنا ونبدأ تشغيله باستخدام ServerBuilder.

لإجراء ذلك، ننفّذ ما يلي:

  1. حدِّد العنوان والمنفذ اللذين نريد استخدامهما للاستماع إلى طلبات العميل باستخدام طريقة forPort() الخاصة بأداة الإنشاء.
  2. أنشئ مثيلاً لفئة تنفيذ الخدمة RouteGuideService ومرِّره إلى طريقة addService() الخاصة بأداة الإنشاء.
  3. استدعِ build() وstart() في أداة الإنشاء لإنشاء خادم RPC وبدء تشغيله لخدمتنا.

بما أنّ ServerBuilder يتضمّن المنفذ، السبب الوحيد الذي يجعلنا نمرّر منفذًا هو استخدامه في التسجيل.

6. إنشاء العميل

في هذا القسم، سنلقي نظرة على كيفية إنشاء برنامج لخدمة RouteGuide. يمكنك الاطّلاع على نموذج رمز العميل الكامل في ../complete/src/main/java/io/grpc/complete/routeguide/ RouteGuideClient.java.

إنشاء مثيل من رمز بديل

لاستدعاء طرق الخدمة، علينا أولاً إنشاء رمز بديل، أو بالأحرى رمزان بديلان:

  • رمز بديل حظر/تزامن: يعني ذلك أنّ طلب استدعاء الإجراء عن بُعد (RPC) ينتظر رد الخادم، وسيعرض إما ردًا أو يثير استثناءً.
  • رمز بديل غير حظر/غير متزامن ينفّذ طلبات غير حظر إلى الخادم، ويتم عرض الردّ بشكل غير متزامن. يمكنك إجراء أنواع معيّنة من مكالمات البث فقط باستخدام رمز بديل غير متزامن.

أولاً، علينا إنشاء قناة gRPC للرمز البديل، مع تحديد عنوان الخادم والمنفذ اللذين نريد الاتصال بهما:

  public static void main(String[] args) throws InterruptedException {
    String target = "localhost:8980";
    if (args.length > 0) {
      if ("--help".equals(args[0])) {
        System.err.println("Usage: [target]");
        System.err.println("");
        System.err.println("  target  The server to connect to. Defaults to " + target);
        System.exit(1);
      }
      target = args[0];
    }

    List<Feature> features;
    try {
      features = RouteGuideUtil.parseFeatures(RouteGuideUtil.getDefaultFeaturesFile());
    } catch (IOException ex) {
      ex.printStackTrace();
      return;
    }

    ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create())
        .build();
    try {
      RouteGuideClient client = new RouteGuideClient(channel);

      // Looking for features between 40, -75 and 42, -73.
      client.listFeatures(400000000, -750000000, 420000000, -730000000);

      // Record a few randomly selected points from the features file.
      client.recordRoute(features, 10);

      // Send and receive some notes.
      CountDownLatch finishLatch = client.routeChat();

      if (!finishLatch.await(1, TimeUnit.MINUTES)) {
        client.warning("routeChat did not finish within 1 minutes");
      }
    } finally {
      channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
    }
  }

نستخدم ManagedChannelBuilder لإنشاء القناة.

يمكننا الآن استخدام القناة لإنشاء الرموز البديلة باستخدام طريقتَي newStub وnewBlockingStub المتوفّرتَين في فئة RouteGuideGrpc التي أنشأناها من .proto.

public RouteGuideClient(Channel channel) {
    blockingStub = RouteGuideGrpc.newBlockingStub(channel);
    asyncStub = RouteGuideGrpc.newStub(channel);
  }

تذكَّر أنّه إذا لم يكن الحظر متزامنًا، سيكون غير متزامن

طُرق خدمات الاستدعاء

لنلقِ نظرة الآن على كيفية استدعاء طرق الخدمة. يُرجى العِلم أنّ أي طلبات استدعاء إجراء عن بُعد (RPC) يتم إنشاؤها من stub الحظر ستعمل في وضع الحظر/التزامن، ما يعني أنّ طلب RPC ينتظر رد الخادم، وسيعرض إما ردًا أو خطأ.

استدعاء إجراء عن بُعد (RPC) للبث من جهة الخادم

لنلقِ نظرة الآن على طلب بث من جهة الخادم إلى ListFeatures، والذي يعرض بثًا Feature جغرافيًا:

Rectangle request = Rectangle.newBuilder()
             .setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
        .setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();

Iterator<Feature> features;
try {
  features = blockingStub.listFeatures(request);
} catch (StatusRuntimeException e) {
  logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
  return;
}

كما ترى، يشبه هذا النوع إلى حد كبير عملية RPC الأحادية البسيطة التي تناولناها في درس تطبيقي حول الترميز Getting_Started_With_gRPC_Java، ولكن بدلاً من عرض Feature واحد، تعرض الطريقة Iterator يمكن للعميل استخدامه لقراءة كل Features المعروضة.

استدعاء إجراء عن بُعد للبث من جهة العميل

والآن، لننتقل إلى طريقة أكثر تعقيدًا: طريقة البث من جهة العميل RecordRoute، حيث نرسل مجموعة من Points إلى الخادم ونتلقّى RouteSummary واحدًا. بالنسبة إلى هذه الطريقة، علينا استخدام رمز بديل غير متزامن. إذا سبق لك قراءة إنشاء الخادم، قد يبدو بعض هذا المحتوى مألوفًا جدًا، إذ يتم تنفيذ طلبات RPC المتدفّقة غير المتزامنة بطريقة مشابهة على كلا الجانبين.

public void recordRoute(List<Feature> features, int numPoints) throws InterruptedException {
  info("*** RecordRoute");
  final CountDownLatch finishLatch = new CountDownLatch(1);
  StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {

    @Override
    public void onNext(RouteSummary summary) {
      info("Finished trip with {0} points. Passed {1} features. "
          + "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
          summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime());
    }

    @Override
    public void onError(Throwable t) {
      Status status = Status.fromThrowable(t);
      logger.log(Level.WARNING, "RecordRoute Failed: {0}", status);
      finishLatch.countDown();
    }

    @Override
    public void onCompleted() {
      info("Finished RecordRoute");
      finishLatch.countDown();
    }
  };

  StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
  try {
    // Send numPoints points randomly selected from the features list.
    Random rand = new Random();
    for (int i = 0; i < numPoints; ++i) {
      int index = rand.nextInt(features.size());
      Point point = features.get(index).getLocation();
      info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
          RouteGuideUtil.getLongitude(point));
      requestObserver.onNext(point);
      // Sleep for a bit before sending the next one.
      Thread.sleep(rand.nextInt(1000) + 500);
      if (finishLatch.getCount() == 0) {
        // RPC completed or errored before we finished sending.
        // Sending further requests won't error, but they will just be thrown away.
        return;
      }
    }
  } catch (RuntimeException e) {
    // Cancel RPC
    requestObserver.onError(e);
    throw e;
  }
  // Mark the end of requests
  requestObserver.onCompleted();

  // Receiving happens asynchronously
  finishLatch.await(1, TimeUnit.MINUTES);
}

كما ترى، لاستدعاء هذه الطريقة، علينا إنشاء StreamObserver، الذي ينفّذ واجهة خاصة للخادم لاستدعائها باستخدام الرد RouteSummary. في StreamObserver، نعمل على:

  • تجاهل طريقة onNext() لطباعة المعلومات التي تم إرجاعها عندما يكتب الخادم RouteSummary إلى بث الرسائل.
  • تجاوز طريقة onCompleted() (يتم استدعاؤها عندما يكمل الخادم المكالمة من جهته) لتقليل CountDownLatch حتى نتمكّن من التحقّق مما إذا كان الخادم قد انتهى من الكتابة.

بعد ذلك، نمرّر StreamObserver إلى طريقة recordRoute() في العنصر النائب غير المتزامن ونستردّ مراقب طلب StreamObserver الخاص بنا لكتابة Points لإرساله إلى الخادم. بعد الانتهاء من كتابة النقاط، نستخدم طريقة onCompleted() الخاصة بمراقب الطلب لإعلام gRPC بأنّنا انتهينا من الكتابة على جهة العميل. بعد الانتهاء، نتحقّق من CountDownLatch لمعرفة ما إذا كان الخادم قد أكمل العملية من جهته.

استدعاء إجراء عن بُعد (RPC) باستخدام البث الثنائي الاتجاه

أخيرًا، لنلقِ نظرة على استدعاء الإجراء عن بُعد (RPC) RouteChat() الذي يتيح البث الثنائي الاتجاه.

public CountDownLatch routeChat() {
    info("*** RouteChat");
    final CountDownLatch finishLatch = new CountDownLatch(1);
    StreamObserver<RouteNote> requestObserver =
        asyncStub.routeChat(new StreamObserver<RouteNote>() {
          @Override
          public void onNext(RouteNote note) {
            info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()
                .getLatitude(), note.getLocation().getLongitude());
          }

          @Override
          public void onError(Throwable t) {
            warning("RouteChat Failed: {0}", Status.fromThrowable(t));
            finishLatch.countDown();
          }

          @Override
          public void onCompleted() {
            info("Finished RouteChat");
            finishLatch.countDown();
          }
        });

    try {
      RouteNote[] requests =
          {newNote("First message", 0, 0), newNote("Second message", 0, 10_000_000),
              newNote("Third message", 10_000_000, 0), newNote("Fourth message", 10_000_000, 10_000_000)};

      for (RouteNote request : requests) {
        info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()
            .getLatitude(), request.getLocation().getLongitude());
        requestObserver.onNext(request);
      }
    } catch (RuntimeException e) {
      // Cancel RPC
      requestObserver.onError(e);
      throw e;
    }
    // Mark the end of requests
    requestObserver.onCompleted();

    // return the latch while receiving happens asynchronously
    return finishLatch;
  }

كما هو الحال مع مثال البث من جهة العميل، نحصل على مراقب ردود ونعرضه، ولكن هذه المرة نرسل القيم من خلال مراقب الردود الخاص بالطريقة بينما لا يزال الخادم يكتب الرسائل إلى بث الرسائل الخاص به.StreamObserver إنّ بناء الجملة الخاص بالقراءة والكتابة هنا هو نفسه تمامًا كما هو الحال في طريقة البث من جهة العميل. على الرغم من أنّ كل طرف سيتلقّى دائمًا رسائل الطرف الآخر بالترتيب الذي تمت كتابتها به، يمكن لكل من العميل والخادم القراءة والكتابة بأي ترتيب، إذ تعمل عمليات البث بشكل مستقل تمامًا.

7. ننصحكم بتجربتها.

  1. من دليل start_here:
$ ./gradlew installDist

سيؤدي ذلك إلى تجميع الرمز البرمجي وتعبئته في ملف jar وإنشاء البرامج النصية التي تشغّل المثال. سيتم إنشاؤها في دليل build/install/start_here/bin/. البرنامجان النصيان هما: route-guide-server وroute-guide-client.

يجب أن يكون الخادم قيد التشغيل قبل بدء تشغيل العميل.

  1. شغِّل الخادم:
$ ./build/install/start_here/bin/route-guide-server
  1. شغِّل العميل:
$ ./build/install/start_here/bin/route-guide-client

8. الخطوات التالية