תחילת העבודה עם gRPC-Python – סטרימינג

1. מבוא

ב-codelab הזה תשתמשו ב-gRPC-Python כדי ליצור לקוח ושרת שיהוו את הבסיס לאפליקציה למיפוי מסלולים שנכתבה ב-Python.

בסוף המדריך יהיה לכם לקוח שמתחבר לשרת מרוחק באמצעות gRPC כדי לקבל מידע על תכונות במסלול של לקוח, ליצור סיכום של המסלול של הלקוח ולהחליף מידע על המסלול, כמו עדכוני תנועה, עם השרת ועם לקוחות אחרים.

השירות מוגדר בקובץ Protocol Buffers, שישמש ליצירת קוד boilerplate ללקוח ולשרת, כדי שהם יוכלו לתקשר זה עם זה. כך תוכלו לחסוך זמן ומאמץ בהטמעת הפונקציונליות הזו.

הקוד שנוצר מטפל לא רק במורכבויות של התקשורת בין השרת ללקוח, אלא גם בסריאליזציה ובדה-סריאליזציה של הנתונים.

מה תלמדו

  • איך משתמשים ב-Protocol Buffers כדי להגדיר API של שירות.
  • איך ליצור לקוח ושרת מבוססי gRPC מהגדרה של Protocol Buffers באמצעות יצירת קוד אוטומטית.
  • הבנה של תקשורת סטרימינג בין לקוח לשרת באמצעות gRPC.

ה-codelab הזה מיועד למפתחי Python שחדשים ב-gRPC או שרוצים לרענן את הידע שלהם ב-gRPC, או לכל מי שמעוניין ליצור מערכות מבוזרות. לא נדרש ניסיון קודם ב-gRPC.

‫2. לפני שמתחילים

הדרישות

  • Python בגרסה 3.9 ואילך. מומלץ להשתמש ב-Python 3.13. הוראות התקנה ספציפיות לפלטפורמה מופיעות במאמר Python Setup and Usage (הגדרה ושימוש ב-Python). אפשר גם להתקין Python שאינו מערכת באמצעות כלים כמו uv או pyenv.
  • pip להתקנת חבילות Python.
  • venv כדי ליצור סביבות וירטואליות של Python.

החבילות ensurepip ו-venv הן חלק מהספרייה הרגילה של Python, ובדרך כלל הן זמינות כברירת מחדל.

עם זאת, חלק מההפצות שמבוססות על Debian (כולל Ubuntu) בוחרות להחריג אותן כשמפיצים מחדש את Python. כדי להתקין את החבילות, מריצים את הפקודה:

sudo apt install python3-pip python3-venv

קבל את הקוד

כדי לייעל את הלמידה, ב-Codelab הזה מוצעת מסגרת קוד מקור מוכנה מראש שתעזור לכם להתחיל. בשלבים הבאים מוסבר איך להשלים את הבקשה, כולל יצירת קוד gRPC באמצעות התוסף grpc_tools.protoc Protocol Buffer compiler.

grpc-codelabs

קוד המקור של ה-scaffold ל-Codelab הזה זמין בספרייה codelabs/grpc-python-streaming/start_here. אם אתם מעדיפים לא להטמיע את הקוד בעצמכם, קוד המקור המלא זמין בספרייה completed.

קודם יוצרים את ספריית העבודה של ה-codelab ועוברים אליה:

mkdir grpc-python-streaming && cd grpc-python-streaming

מורידים ומחלצים את ה-codelab:

curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \
  | tar xvz --strip-components=4 \
  grpc-codelabs-1/codelabs/grpc-python-streaming/start_here

אפשרות אחרת היא להוריד את קובץ ה-‎ .zip שמכיל רק את ספריית ה-codelab ולבטל את הדחיסה שלו באופן ידני.

3. הגדרת הודעות ושירותים

השלב הראשון הוא להגדיר את שירות ה-gRPC של האפליקציה, את שיטת ה-RPC ואת סוגי ההודעות של הבקשה והתגובה באמצעות Protocol Buffers. השירות שלכם יספק:

  • שיטות RPC שנקראות ListFeatures, ‏ RecordRoute ו-RouteChat, שהשרת מטמיע והלקוח קורא להן.
  • סוגי ההודעות Point,‏ Feature,‏ Rectangle,‏ RouteNote ו-RouteSummary, שהן מבני נתונים שמועברים בין הלקוח לשרת כשמפעילים את שיטות ה-RPC.

כל שיטות ה-RPC האלה וסוגי ההודעות שלהן מוגדרים בקובץ protos/route_guide.proto של קוד המקור שסופק.

פרוטוקול Buffers ידוע בדרך כלל בשם protobufs. מידע נוסף על המינוח של gRPC זמין במאמר מושגים מרכזיים, ארכיטקטורה ומחזור חיים של gRPC.

הגדרת סוגי הודעות

בקובץ protos/route_guide.proto של קוד המקור, מגדירים קודם את סוג ההודעה Point. הסמל Point מייצג זוג קואורדינטות של קו רוחב וקו אורך במפה. ב-codelab הזה, משתמשים במספרים שלמים לקואורדינטות:

message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

המספרים 1 ו-2 הם מספרי מזהים ייחודיים לכל אחד מהשדות במבנה message.

לאחר מכן, מגדירים את Feature סוג ההודעה. ‫Feature משתמש בשדה string כדי לציין את השם או הכתובת למשלוח דואר של משהו במיקום שצוין על ידי Point:

message Feature {
  // The name or address of the feature.
  string name = 1;

  // The point where the feature is located.
  Point location = 2;
}

כדי להזרים ללקוח כמה נקודות באזור מסוים, צריך להשתמש בהודעה Rectangle שמייצגת מלבן של קווי רוחב ואורך, שמיוצג על ידי שתי נקודות מנוגדות באלכסון lo ו-hi:

message Rectangle {
  // One corner of the rectangle.
  Point lo = 1;

  // The other corner of the rectangle.
  Point hi = 2;
}

בנוסף, הודעה RouteNote שמייצגת הודעה שנשלחה בנקודה מסוימת:

message RouteNote {
  // The location from which the message is sent.
  Point location = 1;

  // The message to be sent.
  string message = 2;
}

לבסוף, תצטרכו הודעה ב-RouteSummary. ההודעה הזו מתקבלת בתגובה ל-RPC‏ RecordRoute, שמוסבר בקטע הבא. הוא מכיל את מספר הנקודות הבודדות שהתקבלו, את מספר התכונות שזוהו ואת המרחק הכולל שעבר, כסכום מצטבר של המרחק בין כל נקודה.

message RouteSummary {
  // The number of points received.
  int32 point_count = 1;

  // The number of known features passed while traversing the route.
  int32 feature_count = 2;

  // The distance covered in metres.
  int32 distance = 3;

  // The duration of the traversal in seconds.
  int32 elapsed_time = 4;
}

הגדרת שיטות שירות

כדי להגדיר שירות, מציינים שירות עם שם בקובץ .proto. לקובץ route_guide.proto יש מבנה service בשם RouteGuide שמגדיר שיטה אחת או יותר שמוגדרות בשירות של האפליקציה.

כשמגדירים שיטות RPC בהגדרת השירות, מציינים את סוגי הבקשות והתגובות שלהן. בקטע הזה של ה-codelab, נגדיר:

ListFeatures

הפונקציה מחזירה את האובייקטים Feature שזמינים בתוך Rectangle הנתון. התוצאות מועברות בסטרימינג ולא מוחזרות בבת אחת, כי יכול להיות שהמלבן יכסה אזור גדול ויכיל מספר עצום של תכונות.

באפליקציה הזו, תשתמשו ב-RPC של סטרימינג בצד השרת: הלקוח שולח בקשה לשרת ומקבל סטרימינג כדי לקרוא רצף של הודעות בחזרה. הלקוח קורא מהזרם שהוחזר עד שאין יותר הודעות. כפי שאפשר לראות בדוגמה שלנו, כדי לציין שיטת סטרימינג בצד השרת, צריך להציב את מילת המפתח stream לפני סוג התגובה.

rpc ListFeatures(Rectangle) returns (stream Feature) {}

RecordRoute

מקבל פלט של נקודות במסלול שמוגדר, ומחזיר RouteSummary כשהמסלול מסתיים.

במקרה כזה, מתאים להשתמש ב-RPC של סטרימינג מצד הלקוח: הלקוח כותב רצף של הודעות ושולח אותן לשרת, שוב באמצעות סטרימינג שסופק. אחרי שהלקוח מסיים לכתוב את ההודעות, הוא מחכה שהשרת יקרא את כולן ויחזיר את התגובה שלו. כדי לציין שיטת סטרימינג בצד הלקוח, מציבים את מילת המפתח stream לפני סוג הבקשה.

rpc RecordRoute(stream Point) returns (RouteSummary) {}

RouteChat

מקבלת זרם של RouteNotes שנשלח בזמן שמתבצעת תנועה במסלול, תוך כדי קבלת RouteNotes אחרים (למשל ממשתמשים אחרים).

זה בדיוק תרחיש השימוש בסטרימינג דו-כיווני. בקשת RPC לסטרימינג דו-כיווני שבה שני הצדדים שולחים רצף של הודעות באמצעות סטרימינג לקריאה ולכתיבה. שני הזרמים פועלים באופן עצמאי, כך שהלקוחות והשרתים יכולים לקרוא ולכתוב בכל סדר שירצו: לדוגמה, השרת יכול לחכות עד שיקבל את כל ההודעות מהלקוח לפני שיכתוב את התשובות שלו, או שהוא יכול לקרוא הודעה ואז לכתוב הודעה, או כל שילוב אחר של קריאות וכתיבות. הסדר של ההודעות בכל זרם נשמר. כדי לציין את סוג ה-method הזה, צריך להוסיף את מילת המפתח stream לפני הבקשה ולפני התשובה.

rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

4. יצירת קוד הלקוח והשרת

לאחר מכן, יוצרים את קוד ה-boilerplate של gRPC גם ללקוח וגם לשרת מקובץ .proto באמצעות מהדר מאגר אחסון לפרוטוקולים.

יצרנו את grpcio-tools כדי ליצור קוד gRPC Python. היא כוללת:

  1. הקומפיילר הרגיל protoc שיוצר קוד Python מהגדרות message.
  2. תוסף gRPC protobuf שיוצר קוד Python (קטעי קוד חלופי (stub) של לקוח ושרת) מההגדרות של service.

נתקין את חבילת Python‏ grpcio-tools באמצעות pip. כדאי ליצור סביבה וירטואלית של Python (venv) כדי לבודד את התלות של הפרויקט בחבילות המערכת:

python3 -m venv --upgrade-deps .venv

כדי להפעיל את הסביבה הווירטואלית במעטפת bash/zsh:

source .venv/bin/activate

בטבלה שבכתובת https://docs.python.org/3/library/venv.html#how-venvs-work מוסבר איך פועלים סביבות וירטואליות ב-Windows ובמעטפות לא סטנדרטיות.

לאחר מכן, מתקינים את grpcio-tools (הפעולה הזו מתקינה גם את החבילה grpcio):

pip install grpcio-tools

כדי ליצור את קוד ה-boilerplate של Python, משתמשים בפקודה הבאה:

python -m grpc_tools.protoc --proto_path=./protos  \
 --python_out=. --pyi_out=. --grpc_python_out=. \
 ./protos/route_guide.proto

הפעולה הזו תיצור את הקבצים הבאים לממשקים שהגדרנו ב-route_guide.proto:

  1. route_guide_pb2.py מכיל את הקוד שיוצר באופן דינמי כיתות שנוצרו מההגדרות של message.
  2. route_guide_pb2.pyi הוא קובץ stub או קובץ רמזים לסוג שנוצר מההגדרות של message. הוא מכיל רק את החתימות ללא הטמעה. סביבות פיתוח משולבות (IDE) יכולות להשתמש בקובצי stub כדי לספק השלמה אוטומטית טובה יותר וזיהוי שגיאות.
  3. route_guide_pb2_grpc.py נוצר מההגדרות של service ומכיל מחלקות ופונקציות ספציפיות ל-gRPC.

קוד ספציפי ל-gRPC מכיל:

  1. RouteGuideStub, שאפשר להשתמש בו על ידי לקוח gRPC כדי להפעיל RPC של RouteGuide.
  2. RouteGuideServicer, שמגדיר את הממשק להטמעות של שירות RouteGuide.
  3. הפונקציה add_RouteGuideServicer_to_server משמשת לרישום RouteGuideServicer ב-gRPC server.

5. יצירת השרת

קודם נראה איך יוצרים RouteGuide שרת. יצירה והפעלה של שרת RouteGuide מתחלקות לשני פריטי עבודה:

  • הטמעה של ממשק השרת שנוצר מהגדרת השירות עם פונקציות שמבצעות את ה "עבודה" בפועל של השירות.
  • הפעלת שרת gRPC להאזנה לבקשות מלקוחות ולהעברת תגובות.

בואו נבדוק את route_guide_server.py.

הטמעה של RouteGuide

route_guide_server.py יש מחלקה RouteGuideServicer שמסווגת משנה את המחלקה שנוצרה route_guide_pb2_grpc.RouteGuideServicer:

# RouteGuideServicer provides an implementation of the methods of the RouteGuide service.
class RouteGuideServicer(route_guide_pb2_grpc.RouteGuideServicer):

RouteGuideServicer מטמיע את כל שיטות השירות של RouteGuide.

הזרמת RPC בצד השרת

ListFeatures הוא RPC של סטרימינג תגובות ששולח כמה Feature ללקוח:

def ListFeatures(self, request, context):
    """List all features contained within the given Rectangle."""
    left = min(request.lo.longitude, request.hi.longitude)
    right = max(request.lo.longitude, request.hi.longitude)
    top = max(request.lo.latitude, request.hi.latitude)
    bottom = min(request.lo.latitude, request.hi.latitude)
    for feature in self.db:
        lat, lng = feature.location.latitude, feature.location.longitude
        if left <= lng <= right and bottom <= lat <= top:
            yield feature

בדוגמה הזו, הודעת הבקשה היא route_guide_pb2.Rectangle שבתוכה הלקוח רוצה למצוא Features. במקום להחזיר תשובה אחת, השיטה מחזירה אפס תשובות או יותר.

Client-side streaming RPC

השיטה RecordRoute של בקשת סטרימינג משתמשת באיטרטור של ערכי בקשה ומחזירה ערך היענות יחיד.

def RecordRoute(self, request_iterator, context):
    """Calculate statistics about the trip composed of Points."""
    point_count = 0
    feature_count = 0
    distance = 0.0
    prev_point = None

    start_time = time.time()
    for point in request_iterator:
        point_count += 1
        if get_feature(self.db, point):
            feature_count += 1
        if prev_point:
            distance += get_distance(prev_point, point)
        prev_point = point

    elapsed_time = time.time() - start_time
    return route_guide_pb2.RouteSummary(
        point_count=point_count,
        feature_count=feature_count,
        distance=int(distance),
        elapsed_time=int(elapsed_time),
    )

Bidirectional streaming RPC

לבסוף, נבחן את ה-RPC של הסטרימינג הדו-כיווני RouteChat():

def RouteChat(self, request_iterator, context):
    """
    Receive a stream of message/location pairs, and responds with
    a stream of all previous messages for the given location.
    """
    prev_notes = []
    for new_note in request_iterator:
        for prev_note in prev_notes:
            if prev_note.location == new_note.location:
                yield prev_note
        prev_notes.append(new_note)

הסמנטיקה של השיטה הזו היא שילוב של הסמנטיקה של השיטה request-streaming ושל השיטה response-streaming. הוא מקבל איטרטור של ערכי בקשות והוא עצמו איטרטור של ערכי תגובות.

הפעלת השרת

אחרי שמטמיעים את כל השיטות של RouteGuide, השלב הבא הוא להפעיל שרת gRPC כדי שהלקוחות יוכלו להשתמש בשירות:

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    route_guide_pb2_grpc.add_RouteGuideServicer_to_server(
        RouteGuideServicer(),
        server,
    )
    listen_addr = "localhost:50051"
    server.add_insecure_port(listen_addr)
    print(f"Starting server on {listen_addr}")
    server.start()
    server.wait_for_termination()

השיטה start() של השרת לא חוסמת. ייווצר שרשור חדש כדי לטפל בבקשות. לרוב, לשרשור שקורא ל-server.start() לא תהיה עבודה אחרת לעשות בינתיים. במקרה כזה, אפשר להתקשר אל server.wait_for_termination() כדי לחסום את שרשור השיחות בצורה נקייה עד שהשרת יסיים את הפעולה.

6. יצירת הלקוח

בואו נבדוק את route_guide_client.py.

יצירת stub

כדי לקרוא לשיטות של שירות, קודם צריך ליצור stub.

אנחנו יוצרים מופע של המחלקה RouteGuideStub מהמודול route_guide_pb2_grpc, שנוצר מהשיטה run() ב-.proto.:

with grpc.insecure_channel("localhost:50051") as channel:
    stub = route_guide_pb2_grpc.RouteGuideStub(channel)

שימו לב: כאן נעשה שימוש ב-channel כמנהל הקשר, והוא ייסגר אוטומטית ברגע שהמתורגמן יעזוב את הבלוק with.

הפעלת שיטות שירות

עבור שיטות RPC שמחזירות תגובה יחידה (שיטות response-unary), ‏ gRPC Python תומך בסמנטיקה של זרימת בקרה סינכרונית (חסימה) ואסינכרונית (לא חסימה). בשיטות RPC של סטרימינג תגובות, הקריאות מחזירות מיד איטרטור של ערכי תגובה. הקריאות לשיטה next() של האיטרטור הזה נחסמות עד שהתשובה שצריך להחזיר מהאיטרטור הופכת לזמינה.

הזרמת RPC בצד השרת

השימוש ב-response-streaming ListFeatures דומה לעבודה עם סוגי רצפים:

def guide_list_features(stub):
    _lo = route_guide_pb2.Point(latitude=400000000, longitude=-750000000)
    _hi = route_guide_pb2.Point(latitude=420000000, longitude=-730000000)
    rectangle = route_guide_pb2.Rectangle(
        lo=_lo,
        hi=_hi,
    )
    print("Looking for features between 40, -75 and 42, -73")

    features = stub.ListFeatures(rectangle)
    for feature in features:
        print(
            f"Feature called '{feature.name}'"
            f" at {format_point(feature.location)}"
        )

Client-side streaming RPC

הקריאה ל-request-streaming RecordRoute דומה להעברת איטרטור לשיטה מקומית. בדומה ל-RPC הפשוט שלמעלה שמחזיר גם תגובה יחידה, אפשר לקרוא לו באופן סינכרוני:

def guide_record_route(stub):
    feature_list = route_guide_resources.read_route_guide_database()
    route_iterator = generate_route(feature_list)

    route_summary = stub.RecordRoute(route_iterator)
    print(f"Finished trip with {route_summary.point_count} points")
    print(f"Passed {route_summary.feature_count} features")
    print(f"Traveled {route_summary.distance} meters")
    print(f"It took {route_summary.elapsed_time} seconds")

Bidirectional streaming RPC

הקריאה ל-RouteChat של סטרימינג דו-כיווני (כמו במקרה של צד השרת) כוללת שילוב של סמנטיקה של סטרימינג של בקשות וסטרימינג של תשובות.

יוצרים את הודעות הבקשה ושולחים אותן אחת אחת באמצעות yield.

def generate_notes():
    home = route_guide_pb2.Point(latitude=1, longitude=1)
    work = route_guide_pb2.Point(latitude=2, longitude=2)
    notes = [
        make_route_note("Departing from home", home),
        make_route_note("Arrived at work", work),
        make_route_note("Having lunch at work", work),
        make_route_note("Departing from work", work),
        make_route_note("Arrived home", home),
    ]
    for note in notes:
        print(
            f"Sending RouteNote for {format_point(note.location)}:"
            f" {note.message}"
        )
        yield note
        # Sleep to simulate moving from one point to another.
        # Only for demonstrating the order of the messages.
        time.sleep(0.1)

קבלת תגובות מהשרת ועיבוד שלהן:

def guide_route_chat(stub):
    responses = stub.RouteChat(generate_notes())
    for response in responses:
        print(
            "< Found previous note at"
            f" {format_point(response.location)}: {response.message}"
        )

הפעלת שיטות העזר

בפעולת ההרצה, מריצים את השיטות שיצרנו ומעבירים להן את stub.

print("-------------- ListFeatures --------------")
guide_list_features(stub)
print("-------------- RecordRoute --------------")
guide_record_route(stub)
print("-------------- RouteChat --------------")
guide_route_chat(stub)

7. רוצה לנסות?

מריצים את השרת:

python route_guide_server.py

ממסוף אחר, מפעילים שוב את הסביבה הווירטואלית (source .venv/bin/activate)), ואז מריצים את הלקוח:

python route_guide_client.py

בואו נסתכל על הפלט.

ListFeatures

קודם כל, תוצג רשימת התכונות. כל תכונה מועברת בסטרימינג מהשרת (RPC בצד השרת לסטרימינג) כשהמערכת מזהה שהיא נמצאת בתוך המלבן המבוקש:

-------------- ListFeatures --------------
Looking for features between 40, -75 and 42, -73
Feature called 'Patriots Path, Mendham, NJ 07945, USA' at (lat=407838351, lng=-746143763)
Feature called '101 New Jersey 10, Whippany, NJ 07981, USA' at (lat=408122808, lng=-743999179)
Feature called 'U.S. 6, Shohola, PA 18458, USA' at (lat=413628156, lng=-749015468)
Feature called '5 Conners Road, Kingston, NY 12401, USA' at (lat=419999544, lng=-740371136)
...

RecordRoute

שנית, RecordRoute מדגים את רשימת הנקודות שנבחרו באופן אקראי ומוזרמות מהלקוח לשרת (הזרמת RPC בצד הלקוח):

-------------- RecordRoute --------------
Visiting point (lat=410395868, lng=-744972325)
Visiting point (lat=404310607, lng=-740282632)
Visiting point (lat=403966326, lng=-748519297)
Visiting point (lat=407586880, lng=-741670168)
Visiting point (lat=406589790, lng=-743560121)
Visiting point (lat=410322033, lng=-747871659)
Visiting point (lat=415464475, lng=-747175374)
Visiting point (lat=407586880, lng=-741670168)
Visiting point (lat=402647019, lng=-747071791)
Visiting point (lat=414638017, lng=-745957854)

אחרי שהלקוח יסיים להזרים את כל הנקודות שביקר בהן, הוא יקבל מהשרת תגובה שלא מבוססת על סטרימינג (RPC אוניטרי). התגובה הזו תכיל סיכום של החישובים שבוצעו במסלול המלא של הלקוח.

Finished trip with 10 points
Passed 10 features
Traveled 654743 meters
It took 0 seconds

RouteChat

לבסוף, הפלט RouteChat מדגים סטרימינג דו-כיווני. כשהלקוח 'מבקר' בנקודות home או work, הוא שולח RouteNote לשרת כדי לתעד הערה לגבי הנקודה. אם כבר ביקרתם בנקודה מסוימת, השרת ישדר בחזרה את כל ההערות הקודמות לגבי הנקודה הזו.

-------------- RouteChat --------------
Sending RouteNote for (lat=1, lng=1): Departing from home
Sending RouteNote for (lat=2, lng=2): Arrived at work
Sending RouteNote for (lat=2, lng=2): Having lunch at work
< Found previous note at (lat=2, lng=2): Arrived at work
Sending RouteNote for (lat=2, lng=2): Departing from work
< Found previous note at (lat=2, lng=2): Arrived at work
< Found previous note at (lat=2, lng=2): Having lunch at work
Sending RouteNote for (lat=1, lng=1): Arrived home
< Found previous note at (lat=1, lng=1): Departing from home

8. המאמרים הבאים