1. Введение
В этом практическом занятии вы будете использовать gRPC-Rust для создания клиента и сервера, которые составляют основу приложения для сопоставления маршрутов, написанного на Rust.
К концу этого руководства у вас будет клиент, который подключается к удаленному серверу с помощью gRPC для получения информации о характеристиках маршрута клиента, создания сводки маршрута клиента и обмена информацией о маршруте, такой как обновления трафика, с сервером и другими клиентами.
Сервис определяется в файле Protocol Buffers, который будет использоваться для генерации шаблонного кода для клиента и сервера, чтобы они могли взаимодействовать друг с другом, экономя ваше время и усилия на реализации этой функциональности.
Сгенерированный код учитывает не только сложности взаимодействия между сервером и клиентом, но и сериализацию и десериализацию данных.
Что вы узнаете
- Как использовать Protocol Buffers для определения API сервиса.
- Как создать клиент и сервер на основе gRPC из определения Protocol Buffers с помощью автоматической генерации кода.
- Понимание принципов потокового взаимодействия между клиентом и сервером с использованием gRPC.
Данный практический семинар предназначен для разработчиков на Rust, которые только начинают работать с gRPC или хотят освежить свои знания gRPC, а также для всех, кто заинтересован в создании распределенных систем. Предварительный опыт работы с gRPC не требуется.
2. Прежде чем начать
Предварительные требования
Убедитесь, что у вас установлены следующие компоненты:
Получите код
Чтобы вам не пришлось начинать с нуля, в этом практическом руководстве представлен шаблон исходного кода приложения, который вы сможете доработать. Следующие шаги покажут вам, как завершить приложение, включая использование плагинов компилятора Protocol Buffer для генерации шаблонного кода gRPC.
Сначала создайте рабочую директорию codelab и перейдите в неё с помощью cd :
mkdir streaming-grpc-rust-getting-started && cd streaming-grpc-rust-getting-started
Скачайте и распакуйте CodeLab:
curl -sL https://github.com/grpc-ecosystem/grpc-codelabs/archive/refs/heads/v1.tar.gz \ | tar xvz --strip-components=4 \ grpc-codelabs-1/codelabs/grpc-rust-streaming/start_here
В качестве альтернативы вы можете скачать ZIP-архив, содержащий только папку codelab, и распаковать его вручную.
Полный исходный код доступен на GitHub, если вы хотите обойтись без ввода кода вручную.
3. Определите сообщения и сервисы.
Первым шагом является определение gRPC-сервиса приложения, его RPC-методов, а также типов сообщений запроса и ответа с помощью Protocol Buffers . Ваш сервис будет предоставлять:
- RPC-методы
ListFeatures,RecordRouteиRouteChat, которые реализует сервер, а вызывает клиент. - Типы сообщений
Point,Feature,Rectangle,RouteNoteиRouteSummaryпредставляют собой структуры данных, которыми обмениваются клиент и сервер при вызове указанных выше методов.
Все эти RPC-методы и соответствующие им типы сообщений будут определены в файле proto/routeguide.proto предоставленного исходного кода.
Протоколы Protocol Buffers обычно называются protobufs. Для получения дополнительной информации о терминологии gRPC см. раздел «Основные концепции, архитектура и жизненный цикл gRPC».
Определение типов сообщений
Давайте сначала определим сообщения, которые будут использоваться нашими RPC-вызовами. В файле routeguide/route_guide.proto исходного кода сначала определите тип сообщения Point . Point представляет собой пару координат широты и долготы на карте. Для этого практического задания используйте целые числа для координат:
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
Цифры 1 и 2 — это уникальные идентификационные номера для каждого поля в структуре message .
Далее определите тип сообщения Feature . В Feature используется string поле для имени или почтового адреса объекта, расположенного в точке Point :
message Feature {
// The name or address of the feature.
string name = 1;
// The point where the feature is located.
Point location = 2;
}
Далее следует сообщение Rectangle , представляющее собой прямоугольник, отображающий широту и долготу, в виде двух точек, расположенных по диагонали друг от друга: «lo» и «hi».
message Rectangle {
// One corner of the rectangle.
Point lo = 1;
// The other corner of the rectangle.
Point hi = 2;
}
Также сообщение RouteNote , представляющее собой сообщение, отправленное в заданной точке маршрута.
message RouteNote {
// The location from which the message is sent.
Point location = 1;
// The message to be sent.
string message = 2;
}
Нам также потребуется сообщение RouteSummary . Это сообщение принимается в ответ на RPC-запрос RecordRoute , который описан в следующем разделе. Оно содержит количество полученных отдельных точек, количество обнаруженных объектов и общее пройденное расстояние как сумма расстояний между каждой точкой.
message RouteSummary {
// The number of points received.
int32 point_count = 1;
// The number of known features passed while traversing the route.
int32 feature_count = 2;
// The distance covered in metres.
int32 distance = 3;
// The duration of the traversal in seconds.
int32 elapsed_time = 4;
}
Определите методы обслуживания
Давайте сначала определим наш сервис, а затем определим наши сообщения. Чтобы определить сервис, вы указываете именованный сервис в файле .proto . Файл proto/routeguide.proto содержит структуру service с именем RouteGuide , которая определяет один или несколько методов, предоставляемых сервисом приложения.
Определите методы RPC внутри определения вашего сервиса, указав их типы запроса и ответа. В этом разделе практического задания давайте определим:
ListFeatures
Получает доступные Feature в пределах заданного Rectangle . Результаты передаются потоком, а не возвращаются сразу (например, в ответном сообщении с повторяющимся полем), поскольку прямоугольник может занимать большую площадь и содержать огромное количество объектов.
Подходящим типом для этого RPC является потоковый RPC на стороне сервера : клиент отправляет запрос на сервер и получает поток для чтения последовательности сообщений. Клиент читает из возвращенного потока до тех пор, пока не закончатся сообщения. Как видно в нашем примере, метод потоковой передачи на стороне сервера указывается путем размещения ключевого слова stream перед типом ответа.
rpc ListFeatures(Rectangle) returns (stream Feature) {}
RecordRoute
Принимает поток Point на проходимом маршруте и возвращает RouteSummary после завершения обхода.
В данном случае представляется подходящим клиентский потоковый RPC: клиент записывает последовательность сообщений и отправляет их на сервер, снова используя предоставленный поток. После завершения записи сообщений клиент ожидает, пока сервер прочтет их все и вернет свой ответ. Метод клиентской потоковой передачи указывается путем добавления ключевого слова stream перед типом запроса.
rpc RecordRoute(stream Point) returns (RouteSummary) {}
RouteChat
Принимает поток сообщений RouteNote , отправляемых во время прохождения маршрута, одновременно получая другие сообщения RouteNote (например, от других пользователей).
Это как раз тот случай, когда двунаправленная потоковая передача данных необходима. В RPC-запросе с двунаправленной потоковой передачей обе стороны отправляют последовательность сообщений, используя поток чтения и записи. Два потока работают независимо, поэтому клиенты и серверы могут читать и записывать данные в любом порядке.
Например, сервер может дождаться получения всех сообщений от клиентов, прежде чем записывать свои ответы, или же он может попеременно читать сообщение, а затем записывать его, или использовать какую-либо другую комбинацию операций чтения и записи.
Порядок сообщений в каждом потоке сохраняется. Этот тип метода указывается путем размещения ключевого слова stream перед запросом и ответом.
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
4. Сгенерируйте код клиента и сервера.
Мы уже предоставили вам сгенерированный код из файла .proto , находящегося в сгенерированной директории.
Если вы хотите .proto самостоятельно генерировать код из файла .proto или вносить в него изменения и тестировать их, обратитесь к этим инструкциям .
Сгенерированный код содержит:
- Определения структур для типов сообщений
Point,Feature,Rectangle,RouteNoteиRouteSummary. - Нам потребуется реализовать трейт сервиса:
route_guide_server::RouteGuide. - Тип клиента, который мы будем использовать для вызова сервера:
route_guide_client::RouteGuideClient<T>.
Далее мы реализуем методы на стороне сервера, чтобы при отправке клиентом запроса сервер мог ответить.
5. Внедрить сервис.
Для начала давайте рассмотрим, как создать сервер RouteGuide . Для того, чтобы наш сервис RouteGuide выполнял свою работу, необходимо выполнить две задачи:
- Реализация интерфейса сервиса, сгенерированного на основе определения нашего сервиса: выполнение фактической «работы» нашего сервиса.
- Запуск gRPC-сервера для прослушивания запросов от клиентов и их переадресации соответствующей реализации метода.
В src/server/server.rs мы можем включить сгенерированный код в область видимости с помощью макроса include_generated_proto! в gRPC и импортировать трейт RouteGuide и Point .
mod grpc_pb {
grpc::include_generated_proto!("generated", "routeguide");
}
pub use grpc_pb::{
route_guide_server::{RouteGuideServer, RouteGuide},
Point, Feature, Rectangle, RouteNote, RouteSummary
};
Для начала можно определить структуру, представляющую наш сервис. Пока это можно сделать в src/server/server.rs :
#[derive(Debug)]
pub struct RouteGuideService {
features: Vec<Feature>,
}
Теперь нам нужно реализовать трейт route_guide_server::RouteGuide в сгенерированном коде.
Внедрить RouteGuide
Нам необходимо реализовать сгенерированный интерфейс RouteGuide . Вот как будет выглядеть реализация. Она уже есть в шаблоне.
#[tonic::async_trait]
impl RouteGuide for RouteGuideService {
async fn list_features(
&self,
request: Request<Rectangle>,
) -> Result<Response<ListFeaturesStream>, Status> {
...
}
async fn record_route(
&self,
request: Request<tonic::Streaming<Point>>,
) -> Result<Response<RouteSummary>, Status> {
...
}
async fn route_chat(
&self,
request: Request<tonic::Streaming<RouteNote>>,
) -> Result<Response<RouteChatStream>, Status> {
...
}
}
Давайте подробно рассмотрим каждую реализацию RPC.
Серверная потоковая передача RPC
Начнём с ListFeatures . Это потоковый RPC-вызов на стороне сервера, поэтому нам нужно отправить клиенту несколько Feature .
async fn list_features(
&self,
request: Request<Rectangle>,
) -> Result<Response<ListFeaturesStream>, Status> {
println!("ListFeatures = {:?}", request);
let (tx, rx) = mpsc::channel(4);
let features = self.features.clone();
tokio::spawn(async move {
for feature in &features[..] {
if in_range(&feature.location().to_owned(), request.get_ref()) {
println!(" => send {feature:?}");
tx.send(Ok(feature.clone())).await.unwrap();
}
}
println!(" /// done sending");
});
let output_stream = ReceiverStream::new(rx);
Ok(Response::new(Box::pin(output_stream)))
}
Как видите, мы получаем объект запроса ( Rectangle , в котором наш клиент хочет найти Features ). На этот раз нам нужно вернуть поток значений. Мы создаём канал и запускаем новую асинхронную задачу, в которой выполняем поиск, отправляя в канал объекты Features, удовлетворяющие нашим ограничениям. Потоковая часть канала возвращается вызывающей стороне, обернутая в tonic::Response .
RPC потоковой передачи на стороне клиента
Теперь давайте рассмотрим нечто немного более сложное: метод потоковой передачи на стороне клиента RecordRoute , который получает поток Points от клиента и возвращает один RouteSummary с информацией о поездке. Он принимает поток в качестве входных данных, который сервер может использовать как для чтения, так и для записи сообщений. Он может перебирать сообщения клиента с помощью своего метода next() и возвращать свой единственный ответ.
async fn record_route(
&self,
request: Request<tonic::Streaming<Point>>,
) -> Result<Response<RouteSummary>, Status> {
println!("RecordRoute");
let mut stream = request.into_inner();
let mut summary = RouteSummary::default();
let mut last_point = None;
let now = Instant::now();
while let Some(point) = stream.next().await {
let point = point?;
println!(" ==> Point = {point:?}");
// Increment the point count
summary.set_point_count(summary.point_count() + 1);
// Find features
for feature in &self.features[..] {
if feature.location().latitude() == point.latitude() {
if feature.location().longitude() == point.longitude(){
summary.set_feature_count(summary.feature_count() + 1);
}
}
}
// Calculate the distance
if let Some(ref last_point) = last_point {
let new_dist = summary.distance() + calc_distance(last_point, &point);
summary.set_distance(new_dist);
}
last_point = Some(point);
}
summary.set_elapsed_time(now.elapsed().as_secs() as i32);
Ok(Response::new(summary))
}
В теле метода мы используем метод next() потока для многократного чтения запросов клиента в объект запроса (в данном случае Point ), пока не закончатся сообщения. Если значение равно None, поток всё ещё в порядке и может продолжать чтение.
Двунаправленный потоковый RPC
Наконец, давайте рассмотрим наш двунаправленный потоковый RPC RouteChat() .
async fn route_chat(
&self,
request: Request<tonic::Streaming<RouteNote>>,
) -> Result<Response<RouteChatStream>, Status> {
println!("RouteChat");
let mut notes: HashMap<(i32, i32), Vec<RouteNote>> = HashMap::new();
let mut stream = request.into_inner();
let output = async_stream::try_stream! {
while let Some(note) = stream.next().await {
let note = note?;
let location = note.location();
let key = (location.latitude(), location.longitude());
let location_notes = notes.entry(key).or_insert(vec![]);
location_notes.push(note);
for note in location_notes {
yield note.clone();
}
}
};
Ok(Response::new(Box::pin(output)))
}
На этот раз мы получаем поток, который, как и в нашем примере с потоковой передачей на стороне клиента, можно использовать для чтения и записи сообщений. Однако на этот раз мы возвращаем значения через поток нашего метода, пока клиент продолжает записывать сообщения в свой поток сообщений. Синтаксис чтения и записи здесь очень похож на наш метод потоковой передачи на стороне клиента, за исключением того, что сервер возвращает RouteChatStream . Хотя каждая сторона всегда будет получать сообщения другой стороны в том порядке, в котором они были записаны, и клиент, и сервер могут читать и записывать в любом порядке — потоки работают полностью независимо.
Мы создаём output поток с помощью try_stream! которая указывает на то, что поток может возвращать ошибки.
Запустите сервер
После реализации этого метода нам также необходимо запустить gRPC-сервер, чтобы клиенты могли фактически использовать наш сервис. Заполните main() .
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "[::1]:10000".parse().unwrap();
println!("RouteGuideServer listening on: {addr}");
let route_guide = RouteGuideService {
features: load(),
};
let svc = RouteGuideServer::new(route_guide);
Server::builder().add_service(svc).serve(addr).await?;
Ok(())
}
Вот что происходит в main() , шаг за шагом:
- Укажите порт, который мы хотим использовать для приема запросов от клиентов.
- Создайте
RouteGuideServiceс загруженными функциями. - Создайте экземпляр gRPC-сервера, используя
RouteGuideServer::new()и созданный нами сервис. - Зарегистрируйте реализацию нашего сервиса на gRPC-сервере.
- Вызовите
serve()на сервере, указав порт, чтобы выполнить блокирующее ожидание до завершения процесса.
6. Создайте клиента.
В этом разделе мы рассмотрим создание Rust-клиента для нашего сервиса RouteGuide в src/client/client.rs .
Сначала включите сгенерированный код в область видимости.
mod grpc_pb {
grpc::include_generated_proto!("generated", "routeguide");
}
use grpc_pb::route_guide_client::RouteGuideClient;
use grpc_pb::{Point, Rectangle, RouteNote};
Методы вызова сервиса
Теперь давайте посмотрим, как мы вызываем методы нашего сервиса. В gRPC-Rust RPC-вызовы работают в блокирующем/синхронном режиме, что означает, что RPC-вызов ожидает ответа от сервера и либо возвращает ответ, либо ошибку.
Серверная потоковая передача RPC
Здесь мы вызываем серверный потоковый метод ListFeatures , который возвращает поток географических объектов Feature .
async fn print_features(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
let rectangle = proto!(Rectangle {
lo: proto!(Point {
latitude: 400_000_000,
longitude: -750_000_000,
}),
hi: proto!(Point {
latitude: 420_000_000,
longitude: -730_000_000,
}),
});
let mut stream = client
.list_features(Request::new(rectangle))
.await?
.into_inner();
while let Some(feature) = stream.message().await? {
println!("FEATURE: Name = \"{}\", Lat = {}, Lon = {}",
feature.name(),
feature.location().latitude(),
feature.location().longitude());
}
Ok(())
}
Мы передаем методу запрос и получаем в ответ экземпляр ListFeaturesStream . Клиент может использовать поток ListFeaturesStream для чтения ответов сервера. Мы используем метод message() объекта ListFeaturesStream для многократного чтения ответов сервера в объект буфера протокола ответов (в данном случае Feature ), пока не закончатся сообщения.
RPC потоковой передачи на стороне клиента
В функции record_route мы преобразуем вектор точек в поток. Затем мы передаем этот поток в record_route() в качестве запроса и получаем один ответ RouteSummary после того, как поток будет полностью обработан сервером.
async fn run_record_route(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
let mut rng = rand::rng();
let point_count: i32 = rng.random_range(2..100);
let mut points = vec![];
for _ in 0..=point_count {
points.push(random_point(&mut rng))
}
println!("Traversing {} points", points.len());
let request = Request::new(tokio_stream::iter(points));
match client.record_route(request).await {
Ok(response) => {
let response = response.into_inner();
println!("SUMMARY: Feature Count = {}, Distance = {}", response.feature_count(), response.distance())},
Err(e) => println!("something went wrong: {e:?}"),
}
Ok(())
}
Двунаправленный потоковый RPC
Наконец, давайте рассмотрим наш двунаправленный потоковый RPC RouteChat() . Мы передаем методу запрос на запись в поток и получаем обратно поток, из которого можем читать сообщения. На этот раз мы возвращаем значения через поток нашего метода, пока сервер еще записывает сообщения в свой поток сообщений.
async fn run_route_chat(client: &mut RouteGuideClient<Channel>) -> Result<(), Box<dyn Error>> {
let start = time::Instant::now();
let outbound = async_stream::stream! {
let mut interval = time::interval(Duration::from_secs(1));
for _ in 0..10 {
let time = interval.tick().await;
let elapsed = time.duration_since(start);
let note = proto!(RouteNote {
location: proto!(Point {
latitude: 409146138 + elapsed.as_secs() as i32,
longitude: -746188906,
}),
message: format!("at {elapsed:?}"),
});
yield note;
}
};
let response = client.route_chat(Request::new(outbound)).await?;
let mut inbound = response.into_inner();
while let Some(note) = inbound.message().await? {
println!("Note: Latitude = {}, Longitude = {}, Message = \"{}\"",
note.location().latitude(),
note.location().longitude(),
note.message());
}
Ok(())
}
Хотя каждая сторона всегда будет получать сообщения другой стороны в том порядке, в котором они были написаны, и клиент, и сервер могут читать и записывать в любом порядке — потоки работают полностью независимо.
Вызов вспомогательных методов
Для вызова методов сервиса нам сначала необходимо создать канал для связи с сервером. Мы создаём его, сначала создавая конечную точку, подключаясь к этой конечной точке и передавая созданный при подключении канал в RouteGuideClient::new() следующим образом:
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create endpoint to connect to
let endpoint = Endpoint::new("http://[::1]:10000")?;
let channel = endpoint.connect().await?;
// Create a new client
let mut client = RouteGuideClient::new(channel);
Ok(())
}
В main() выполните только что созданные нами методы.
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create endpoint to connect to
let endpoint = Endpoint::new("http://[::1]:10000")?;
let channel = endpoint.connect().await?;
// Create a new client
let mut client = RouteGuideClient::new(channel);
println!("\n*** SERVER STREAMING ***");
print_features(&mut client).await?;
println!("\n*** CLIENT STREAMING ***");
run_record_route(&mut client).await?;
println!("\n*** BIDIRECTIONAL STREAMING ***");
run_route_chat(&mut client).await?;
Ok(())
}
7. Попробуйте.
Для начала, чтобы запустить наш клиент и сервер, добавим их в качестве целевых исполняемых файлов в наш крейт. Нам нужно соответствующим образом отредактировать файл Cargo.toml:
[[bin]]
name = "routeguide-server"
path = "src/server/server.rs"
[[bin]]
name = "routeguide-client"
path = "src/client/client.rs"
Как и в любом проекте, нам также необходимо продумать зависимости, необходимые для работы нашего кода. Для проектов на Rust зависимости будут указаны в Cargo.toml . Мы уже перечислили необходимые зависимости в файле Cargo.toml .
Затем выполните следующие команды из наших рабочих каталогов:
- Запустите сервер в одном терминале:
RUSTFLAGS="-Awarnings" cargo run --bin routeguide-server
- Запустите клиент из другого терминала:
RUSTFLAGS="-Awarnings" cargo run --bin routeguide-client
Вы увидите примерно такой результат:
*** SERVER STREAMING *** FEATURE: Name = "Patriots Path, Mendham, NJ 07945, USA", Lat = 407838351, Lon = -746143763 FEATURE: Name = "101 New Jersey 10, Whippany, NJ 07981, USA", Lat = 408122808, Lon = -743999179 FEATURE: Name = "U.S. 6, Shohola, PA 18458, USA", Lat = 413628156, Lon = -749015468 ... *** CLIENT STREAMING *** Traversing 86 points SUMMARY: Feature Count = 0, Distance = 803709356 *** BIDIRECTIONAL STREAMING *** Note: Latitude = 409146138, Longitude = -746188906, Message = "at 112.45µs" Note: Latitude = 409146139, Longitude = -746188906, Message = "at 1.00011245s" Note: Latitude = 409146140, Longitude = -746188906, Message = "at 2.00011245s"
8. Что дальше?
- Узнайте, как работает gRPC, в разделах «Введение в gRPC» и «Основные концепции» .
- Пройдите базовый курс .