微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

当我等待 Kafka 消息然后在 Actix Web 中返回一个值时,瓶颈在哪里?

如何解决当我等待 Kafka 消息然后在 Actix Web 中返回一个值时,瓶颈在哪里?

我正在尝试使用 Kafka 在用 Rust 和 Node.js 编写的 2 个微服务之间进行通信。

我使用 actix-web 作为 Web 框架,使用 rdkafka 作为 Rust 的 Kafka 客户端。在 Node.js 端,它从数据库查询内容并通过 Kafka 将其作为 JSON 返回给 Rust 服务器。

流程:

Request -> Actix Web -> Kafka -> Node -> Kafka -> Actix Web -> Response

逻辑是请求命中Actix Web上的一个端点,然后创建一条消息向另一个微服务请求一些东西并等待它发回(通过Kafka消息密钥验证),并将其作为HTTP返回给用户回复


我让它工作了,但性能很慢(我正在用 wrk 进行压力测试)。

我不知道为什么它的执行速度很慢,但是当我深入挖掘时,我发现如果我在 Node.js 端添加延迟 5 秒,然后我创建了 2 个请求到 actix-web 的请求,其中请求是不同的一秒后,它将响应 5 秒和 10 秒的延迟。

基准测试是每秒大约 3k 个请求,使用以下命令:

wrk http://localhost:8080 -d 20s -t 2 -c 200

这让我猜测可能有什么东西阻塞了每个请求的线程。

这里是 source coderepo

use std::{
    sync::Arc,time::{ 
        Duration,Instant
    }
};

use actix_web::{
    App,HttpServer,get,rt,web::Data
};

use futures::TryStreamExt;
use tokio::time::sleep;

use num_cpus;
use rand::{
    distributions::Alphanumeric,Rng
};

use rdkafka::{
    ClientConfig,Message,consumer::{
        Consumer,StreamConsumer
    },producer::{
        FutureProducer,FutureRecord
    }
};

const TOPIC: &'static str = "exp-queue_general-5";

#[derive(Clone)]
pub struct AppState {
    pub producer: Arc<FutureProducer>,pub receiver: flume::Receiver<String>
}

fn generate_key() -> String {
    rand::thread_rng()
        .sample_iter(&Alphanumeric)
        .take(8)
        .map(char::from)
        .collect()
}

#[get("/")]
async fn landing(state: Data<AppState>) -> String {
    let key = generate_key();
    let t1 = Instant::Now();

    let producer = &state.producer;
    let receiver = &state.receiver;

    producer
        .send(
            FutureRecord::to(&format!("{}-forth",TOPIC))
                .key(&key)
                .payload("Hello From Rust"),Duration::from_secs(8)
        )
        .await
        .expect("Unable to send message");

    println!("Producer take {} ms",t1.elapsed().as_millis());
    
    let t2 = Instant::Now();
    let value = receiver
        .recv()
        .unwrap_or("".to_owned());

    println!("Receiver take {} ms",t2.elapsed().as_millis());
    println!("Process take {} ms\n",t1.elapsed().as_millis());

    value
}

#[get("/status")]
async fn heartbeat() -> &'static str {
    // ? Concurrency delay check
    sleep(Duration::from_secs(1)).await;

    "Working"
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    // ? Assume that the whole node is just Rust instance
    let mut cpus = num_cpus::get() / 2 - 1;

    if cpus < 1 {
        cpus = 1;
    }

    println!("cpus {}",cpus);
    
    let producer: FutureProducer = ClientConfig::new()
        .set("bootstrap.servers","localhost:9092")
        .set("linger.ms","25")
        .set("queue.buffering.max.messages","1000000")
        .set("queue.buffering.max.ms","25")
        .set("compression.type","lz4")
        .set("retries","40000")
        .set("retries","0")
        .set("message.timeout.ms","8000")
        .create()
        .expect("Kafka config");

    let (tx,rx) = flume::unbounded::<String>();

    rt::spawn(async move {
        let consumer: StreamConsumer = ClientConfig::new()
            .set("bootstrap.servers","localhost:9092")
            .set("group.id",&format!("{}-back",TOPIC))
            .set("queued.min.messages","200000")
            .set("fetch.error.backoff.ms","250")
            .set("socket.blocking.max.ms","500")
            .create()
            .expect("Kafka config");

        consumer
            .subscribe(&vec![format!("{}-back",TOPIC).as_ref()])
            .expect("Can't subscribe");

        consumer
            .stream()
            .try_for_each_concurrent(
                cpus,|message| {
                    let txx = tx.clone();

                    async move {
                        let result = String::from_utf8_lossy(
                            message
                            .payload()
                            .unwrap_or("Error serializing".as_bytes())
                        ).to_string();

                        txx.send(result).expect("Tx not sending");

                        Ok(())
                    }

                }
            )
            .await
            .expect("Error reading stream");
    });

    let state = AppState {
        producer: Arc::new(producer),receiver: rx
    };

    HttpServer::new(move || {
        App::new()
            .app_data(Data::new(state.clone()))
            .service(landing)
            .service(heartbeat)
    })
    .workers(cpus)
    .bind("0.0.0.0:8080")?
    .run()
    .await
}

我在 GitHub 上发现了一些已解决的问题,这些问题建议使用 actor,而我也使用了 a separate branch

这比主分支的性能更差,每秒执行大约 200-300 个请求。


我不知道瓶颈在哪里,也不知道是什么阻止了请求。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。