如何解决当我等待 Kafka 消息然后在 Actix Web 中返回一个值时,瓶颈在哪里?
我正在尝试使用 Kafka 在用 Rust 和 Node.js 编写的 2 个微服务之间进行通信。
我使用 actix-web 作为 Web 框架,使用 rdkafka 作为 Rust 的 Kafka 客户端。在 Node.js 端,它从数据库中查询内容并通过 Kafka 将其作为 JSON 返回给 Rust 服务器。
流程:
Request -> Actix Web -> Kafka -> Node -> Kafka -> Actix Web -> Response
逻辑是请求命中Actix Web上的一个端点,然后创建一条消息向另一个微服务请求一些东西并等待它发回(通过Kafka消息密钥验证),并将其作为HTTP返回给用户回复。
我让它工作了,但性能很慢(我正在用 wrk
进行压力测试)。
我不知道为什么它的执行速度很慢,但是当我深入挖掘时,我发现如果我在 Node.js 端添加延迟 5 秒,然后我创建了 2 个请求到 actix-web 的请求,其中请求是不同的一秒后,它将响应 5 秒和 10 秒的延迟。
基准测试是每秒大约 3k 个请求,使用以下命令:
wrk http://localhost:8080 -d 20s -t 2 -c 200
这让我猜测可能有什么东西阻塞了每个请求的线程。
这里是 source code 和 repo:
use std::{
sync::Arc,time::{
Duration,Instant
}
};
use actix_web::{
App,HttpServer,get,rt,web::Data
};
use futures::TryStreamExt;
use tokio::time::sleep;
use num_cpus;
use rand::{
distributions::Alphanumeric,Rng
};
use rdkafka::{
ClientConfig,Message,consumer::{
Consumer,StreamConsumer
},producer::{
FutureProducer,FutureRecord
}
};
const TOPIC: &'static str = "exp-queue_general-5";
#[derive(Clone)]
pub struct AppState {
pub producer: Arc<FutureProducer>,pub receiver: flume::Receiver<String>
}
fn generate_key() -> String {
rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(8)
.map(char::from)
.collect()
}
#[get("/")]
async fn landing(state: Data<AppState>) -> String {
let key = generate_key();
let t1 = Instant::Now();
let producer = &state.producer;
let receiver = &state.receiver;
producer
.send(
FutureRecord::to(&format!("{}-forth",TOPIC))
.key(&key)
.payload("Hello From Rust"),Duration::from_secs(8)
)
.await
.expect("Unable to send message");
println!("Producer take {} ms",t1.elapsed().as_millis());
let t2 = Instant::Now();
let value = receiver
.recv()
.unwrap_or("".to_owned());
println!("Receiver take {} ms",t2.elapsed().as_millis());
println!("Process take {} ms\n",t1.elapsed().as_millis());
value
}
#[get("/status")]
async fn heartbeat() -> &'static str {
// ? Concurrency delay check
sleep(Duration::from_secs(1)).await;
"Working"
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
// ? Assume that the whole node is just Rust instance
let mut cpus = num_cpus::get() / 2 - 1;
if cpus < 1 {
cpus = 1;
}
println!("cpus {}",cpus);
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers","localhost:9092")
.set("linger.ms","25")
.set("queue.buffering.max.messages","1000000")
.set("queue.buffering.max.ms","25")
.set("compression.type","lz4")
.set("retries","40000")
.set("retries","0")
.set("message.timeout.ms","8000")
.create()
.expect("Kafka config");
let (tx,rx) = flume::unbounded::<String>();
rt::spawn(async move {
let consumer: StreamConsumer = ClientConfig::new()
.set("bootstrap.servers","localhost:9092")
.set("group.id",&format!("{}-back",TOPIC))
.set("queued.min.messages","200000")
.set("fetch.error.backoff.ms","250")
.set("socket.blocking.max.ms","500")
.create()
.expect("Kafka config");
consumer
.subscribe(&vec![format!("{}-back",TOPIC).as_ref()])
.expect("Can't subscribe");
consumer
.stream()
.try_for_each_concurrent(
cpus,|message| {
let txx = tx.clone();
async move {
let result = String::from_utf8_lossy(
message
.payload()
.unwrap_or("Error serializing".as_bytes())
).to_string();
txx.send(result).expect("Tx not sending");
Ok(())
}
}
)
.await
.expect("Error reading stream");
});
let state = AppState {
producer: Arc::new(producer),receiver: rx
};
HttpServer::new(move || {
App::new()
.app_data(Data::new(state.clone()))
.service(landing)
.service(heartbeat)
})
.workers(cpus)
.bind("0.0.0.0:8080")?
.run()
.await
}
我在 GitHub 上发现了一些已解决的问题,这些问题建议使用 actor,而我也使用了 a separate branch。
这比主分支的性能更差,每秒执行大约 200-300 个请求。
我不知道瓶颈在哪里,也不知道是什么阻止了请求。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。