如何解决apache kafka的反应性和非阻塞方法Micronaut
我正在尝试从Micronaut kafka实现中获得Non-Blocking响应,但是返回值不起作用。
public class ProductManager implements IProductManager{
private final ApplicationContext applicationContext;
public ProductManager(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
@Override
public ProductViewModel findFreeText(String text) {
final ProductViewModel model = new ProductViewModel();
IProductProducer client = applicationContext.getBean(IProductProducer.class);
client.findFreeText(text).subscribe(item -> {
System.out.println(item);
});
return model;
}
}
subscribe方法不起作用,调试器永远不会出现这一点。我想从kafka监听器中获取价值
卡夫卡制片人
@KafkaClient
public interface IProductProducer {
@Topic(ProductTopicConstants.GET_FREE_TEXT_SEARCH)
Flowable<ProductViewModel> findFreeText(String text);
}
Kafka监听器
@KafkaListener(offsetReset = OffsetReset.EARLIEST)
public class ProductListener {
private static final Logger LOG = LoggerFactory.getLogger(ProductListener.class);
@Topic(ProductTopicConstants.GET_FREE_TEXT_SEARCH)
public Flowable<Product>> findByFreeText(String text) {
LOG.info("Listening value = ",text);
return Flowable.just(new Product("This is the test","This is test description"));
}
}
Micronaut非阻塞方法文档
https://docs.micronaut.io/1.0.0.M3/guide/index.html#_reactive_and_non_blocking_method_definitions
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。