微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

impl AsyncRead for tonic::Streaming

如何解决impl AsyncRead for tonic::Streaming

我正在尝试使用 routeguide tutorial,并将客户端变成 rocket 服务器。我只是接收响应并将 gRPC 转换为字符串。

service RouteGuide {
    rpc GetFeature(Point) returns (Feature) {}
    rpc ListFeatures(Rectangle) returns (stream Feature) {}
}

这对于 GetFeature 来说已经足够好了。对于 ListFeatures 查询,就像 Tonic 允许客户端响应中的流一样,我想将其传递给 Rocket 客户端。我看到 Rocket 支持 streaming 响应,但我需要实现 AsyncRead 特性。

有没有办法做这样的事情?以下是我正在做的事情的精简版:

struct FeatureStream {
    stream: tonic::Streaming<Feature>,}

impl AsyncRead for FeatureStream {
    fn poll_read(
        self: Pin<&mut Self>,cx: &mut Context<'_>,buf: &mut ReadBuf<'_>,) -> Poll<std::io::Result<()>> {
        // Write out as utf8 any response messages.
        match Pin::new(&mut self.stream.message()).poll(cx) {
            Poll::Pending => Poll::Pending,Poll::Ready(feature) => Poll::Pending,}
    }
}

#[get("/list_features")]
async fn list_features(client: State<'_,RouteGuideClient<Channel>>) -> Stream<FeatureStream> {
    let rectangle = Rectangle {
        low: Some(Point {
            latitude: 400_000_000,longitude: -750_000_000,}),high: Some(Point {
            latitude: 420_000_000,longitude: -730_000_000,};
    let mut client = client.inner().clone();
    let stream = client
        .list_features(Request::new(rectangle))
        .await
        .unwrap()
        .into_inner();
    Stream::from(FeatureStream { stream })
}

#[rocket::launch]
async fn rocket() -> rocket::Rocket {
    rocket::ignite()
        .manage(
            create_route_guide_client("http://[::1]:10000")
                .await
                .unwrap(),)
        .mount("/",rocket::routes![list_features,])
}

出现错误

error[E0277]: `from_generator::GenFuture<[static generator@Streaming<Feature>::message::{closure#0} for<'r,'s,'t0,'t1,'t2> {ResumeTy,&'r mut Streaming<Feature>,[closure@Streaming<Feature>::message::{closure#0}::{closure#0}],rocket::futures::future::PollFn<[closure@Streaming<Feature>::message::{closure#0}::{closure#0}]>,()}]>` cannot be unpinned
   --> src/web_user.rs:34:15
    |
34  |         match Pin::new(&mut self.stream.message()).poll(cx) {
    |               ^^^^^^^^ within `impl std::future::Future`,the trait `Unpin` is not implemented for `from_generator::GenFuture<[static generator@Streaming<Feature>::message::{closure#0} for<'r,()}]>`
    | 
   ::: /home/matan/.cargo/registry/src/github.com-1ecc6299db9ec823/tonic-0.4.0/src/codec/decode.rs:106:40
    |
106 |     pub async fn message(&mut self) -> Result<Option<T>,Status> {
    |                                        ------------------------- within this `impl std::future::Future`
    |
    = note: required because it appears within the type `impl std::future::Future`
    = note: required because it appears within the type `impl std::future::Future`
    = note: required by `Pin::<P>::new`

解决方法

问题是由 Future 生成的 tonic::Streaming<Feature>::message() 没有实现 Unpin,因为它是一个 async 函数。让我们将此类型标记为 MessageFuture,您无法安全地固定 &mut MessageFuture 指针,因为解除引用的类型 MessageFuture 没有实现 Unpin

为什么不安全?

reference 开始,Unpin 的实现带来:

固定后可以安全移动的类型。

这意味着如果 T:!Unpin 那么 Pin<&mut T> 是不可移动的,这很重要,因为异步块创建的 Future 没有 Unpin 实现,因为它可能持有成员,如果您移动 T,此引用的指针也将被移动,但引用仍将指向相同的地址,为防止这种情况,它不应该是可移动的。请阅读"Pinning" section from async-book以形象化原因。

注意: T:!Unpin 表示 T 是没有 Unpin 实现的类型。

解决方案

message() 函数是从 tonic::Streaming<T> 中挑选下一条消息的助手。您不需要特别调用 message() 从流中选择下一个元素,您的结构中已经有了实际的流。

struct FeatureStream {stream: tonic::Streaming<Feature>}

您可以等待 AsyncRead 的下一条消息,例如:

impl AsyncRead for FeatureStream {
    fn poll_read(
        mut self: Pin<&mut Self>,cx: &mut Context<'_>,buf: &mut ReadBuf<'_>,) -> Poll<std::io::Result<()>> {

       //it returns Pending for all cases as your code does,you can change it as you want
        match self.stream.poll_next_unpin(cx) {
            Poll::Ready(Some(Ok(m))) => Poll::Pending,Poll::Ready(Some(Err(e))) => Poll::Pending,Poll::Ready(None) => Poll::Pending,Poll::Pending => Poll::Pending
        }
    }
}

请注意,tonic::Streaming<T> 实现了 Unpin(reference)

,

感谢 Omer Erden 回答这个问题。所以它归结为基于futures::Stream trait 实现AsyncRead,这是tonic::Streaming 实现的。这是我实际使用的代码。

impl AsyncRead for FeatureStream {
    fn poll_read(
        mut self: Pin<&mut Self>,) -> Poll<std::io::Result<()>> {
        use futures::stream::StreamExt;
        use std::io::{Error,ErrorKind};

        match self.stream.poll_next_unpin(cx) {
            Poll::Ready(Some(Ok(m))) => {
                buf.put_slice(format!("{:?}\n",m).as_bytes());
                Poll::Ready(Ok(()))
            }
            Poll::Ready(Some(Err(e))) => {
                Poll::Ready(Err(Error::new(ErrorKind::Other,format!("{:?}",e))))
            }
            Poll::Ready(None) => {
                // None from a stream means the stream terminated. To indicate
                // that from AsyncRead we return Ok and leave buf unchanged.
                Poll::Ready(Ok(()))
            }
            Poll::Pending => Poll::Pending,}
    }
}

与此同时,我的解决方法是创建一个 TcpStream(它实现 AsyncRead)的两端并返回它的一端,同时生成一个单独的任务来实际写出结果。

#[get("/list_features")]
async fn list_features(
    client: State<'_,RouteGuideClient<Channel>>,tasks: State<'_,Mutex<Vec<tokio::task::JoinHandle<()>>>>,) -> Result<Stream<TcpStream>,Debug<std::io::Error>> {
    let mut client = client.inner().clone();
    let mut feature_stream = client
        .list_features(Request::new(Rectangle {
            low: Some(Point {
                latitude: 400000000,longitude: -750000000,}),high: Some(Point {
                latitude: 420000000,longitude: -730000000,}))
        .await
        .unwrap()
        .into_inner();

    // Port 0 tells to operating system to choose an unused port.
    let tcp_listener = TcpListener::bind(("127.0.0.1",0)).await?;
    let socket_addr = tcp_listener.local_addr().unwrap();
    tasks.lock().unwrap().push(tokio::spawn(async move {
        let mut tcp_stream = TcpStream::connect(socket_addr).await.unwrap();

        while let Some(feature) = feature_stream.message().await.unwrap() {
            match tcp_stream
                .write_all(format!("{:?}\n",feature).as_bytes())
                .await
            {
                Ok(()) => (),Err(e) => panic!(e),}
        }
        println!("End task");
        ()
    }));
    Ok(Stream::from(tcp_listener.accept().await?.0))
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。