如何解决impl AsyncRead for tonic::Streaming
我正在尝试使用 routeguide tutorial,并将客户端变成 rocket 服务器。我只是接收响应并将 gRPC 转换为字符串。
service RouteGuide {
rpc GetFeature(Point) returns (Feature) {}
rpc ListFeatures(Rectangle) returns (stream Feature) {}
}
这对于 GetFeature 来说已经足够好了。对于 ListFeatures 查询,就像 Tonic 允许客户端响应中的流一样,我想将其传递给 Rocket 客户端。我看到 Rocket 支持 streaming 响应,但我需要实现 AsyncRead 特性。
有没有办法做这样的事情?以下是我正在做的事情的精简版:
struct FeatureStream {
stream: tonic::Streaming<Feature>,}
impl AsyncRead for FeatureStream {
fn poll_read(
self: Pin<&mut Self>,cx: &mut Context<'_>,buf: &mut ReadBuf<'_>,) -> Poll<std::io::Result<()>> {
// Write out as utf8 any response messages.
match Pin::new(&mut self.stream.message()).poll(cx) {
Poll::Pending => Poll::Pending,Poll::Ready(feature) => Poll::Pending,}
}
}
#[get("/list_features")]
async fn list_features(client: State<'_,RouteGuideClient<Channel>>) -> Stream<FeatureStream> {
let rectangle = Rectangle {
low: Some(Point {
latitude: 400_000_000,longitude: -750_000_000,}),high: Some(Point {
latitude: 420_000_000,longitude: -730_000_000,};
let mut client = client.inner().clone();
let stream = client
.list_features(Request::new(rectangle))
.await
.unwrap()
.into_inner();
Stream::from(FeatureStream { stream })
}
#[rocket::launch]
async fn rocket() -> rocket::Rocket {
rocket::ignite()
.manage(
create_route_guide_client("http://[::1]:10000")
.await
.unwrap(),)
.mount("/",rocket::routes![list_features,])
}
出现错误:
error[E0277]: `from_generator::GenFuture<[static generator@Streaming<Feature>::message::{closure#0} for<'r,'s,'t0,'t1,'t2> {ResumeTy,&'r mut Streaming<Feature>,[closure@Streaming<Feature>::message::{closure#0}::{closure#0}],rocket::futures::future::PollFn<[closure@Streaming<Feature>::message::{closure#0}::{closure#0}]>,()}]>` cannot be unpinned
--> src/web_user.rs:34:15
|
34 | match Pin::new(&mut self.stream.message()).poll(cx) {
| ^^^^^^^^ within `impl std::future::Future`,the trait `Unpin` is not implemented for `from_generator::GenFuture<[static generator@Streaming<Feature>::message::{closure#0} for<'r,()}]>`
|
::: /home/matan/.cargo/registry/src/github.com-1ecc6299db9ec823/tonic-0.4.0/src/codec/decode.rs:106:40
|
106 | pub async fn message(&mut self) -> Result<Option<T>,Status> {
| ------------------------- within this `impl std::future::Future`
|
= note: required because it appears within the type `impl std::future::Future`
= note: required because it appears within the type `impl std::future::Future`
= note: required by `Pin::<P>::new`
解决方法
问题是由 Future
生成的 tonic::Streaming<Feature>::message()
没有实现 Unpin
,因为它是一个 async 函数。让我们将此类型标记为 MessageFuture
,您无法安全地固定 &mut MessageFuture
指针,因为解除引用的类型 MessageFuture
没有实现 Unpin
。
为什么不安全?
从 reference 开始,Unpin
的实现带来:
固定后可以安全移动的类型。
这意味着如果 T:!Unpin
那么 Pin<&mut T>
是不可移动的,这很重要,因为异步块创建的 Future
没有 Unpin
实现,因为它可能持有成员,如果您移动 T
,此引用的指针也将被移动,但引用仍将指向相同的地址,为防止这种情况,它不应该是可移动的。请阅读"Pinning" section from async-book以形象化原因。
注意: T:!Unpin
表示 T
是没有 Unpin
实现的类型。
解决方案
message()
函数是从 tonic::Streaming<T>
中挑选下一条消息的助手。您不需要特别调用 message()
从流中选择下一个元素,您的结构中已经有了实际的流。
struct FeatureStream {stream: tonic::Streaming<Feature>}
您可以等待 AsyncRead
的下一条消息,例如:
impl AsyncRead for FeatureStream {
fn poll_read(
mut self: Pin<&mut Self>,cx: &mut Context<'_>,buf: &mut ReadBuf<'_>,) -> Poll<std::io::Result<()>> {
//it returns Pending for all cases as your code does,you can change it as you want
match self.stream.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(m))) => Poll::Pending,Poll::Ready(Some(Err(e))) => Poll::Pending,Poll::Ready(None) => Poll::Pending,Poll::Pending => Poll::Pending
}
}
}
请注意,tonic::Streaming<T>
实现了 Unpin
(reference)
感谢 Omer Erden 回答这个问题。所以它归结为基于futures::Stream trait 实现AsyncRead,这是tonic::Streaming 实现的。这是我实际使用的代码。
impl AsyncRead for FeatureStream {
fn poll_read(
mut self: Pin<&mut Self>,) -> Poll<std::io::Result<()>> {
use futures::stream::StreamExt;
use std::io::{Error,ErrorKind};
match self.stream.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(m))) => {
buf.put_slice(format!("{:?}\n",m).as_bytes());
Poll::Ready(Ok(()))
}
Poll::Ready(Some(Err(e))) => {
Poll::Ready(Err(Error::new(ErrorKind::Other,format!("{:?}",e))))
}
Poll::Ready(None) => {
// None from a stream means the stream terminated. To indicate
// that from AsyncRead we return Ok and leave buf unchanged.
Poll::Ready(Ok(()))
}
Poll::Pending => Poll::Pending,}
}
}
与此同时,我的解决方法是创建一个 TcpStream(它实现 AsyncRead)的两端并返回它的一端,同时生成一个单独的任务来实际写出结果。
#[get("/list_features")]
async fn list_features(
client: State<'_,RouteGuideClient<Channel>>,tasks: State<'_,Mutex<Vec<tokio::task::JoinHandle<()>>>>,) -> Result<Stream<TcpStream>,Debug<std::io::Error>> {
let mut client = client.inner().clone();
let mut feature_stream = client
.list_features(Request::new(Rectangle {
low: Some(Point {
latitude: 400000000,longitude: -750000000,}),high: Some(Point {
latitude: 420000000,longitude: -730000000,}))
.await
.unwrap()
.into_inner();
// Port 0 tells to operating system to choose an unused port.
let tcp_listener = TcpListener::bind(("127.0.0.1",0)).await?;
let socket_addr = tcp_listener.local_addr().unwrap();
tasks.lock().unwrap().push(tokio::spawn(async move {
let mut tcp_stream = TcpStream::connect(socket_addr).await.unwrap();
while let Some(feature) = feature_stream.message().await.unwrap() {
match tcp_stream
.write_all(format!("{:?}\n",feature).as_bytes())
.await
{
Ok(()) => (),Err(e) => panic!(e),}
}
println!("End task");
()
}));
Ok(Stream::from(tcp_listener.accept().await?.0))
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。