如何解决使用Apache Kafka Java和Micronaut应用程序进行Rest API设计
在使用Micronaut框架进行Rest API CRUD操作时,我具有上面的图。我有一个单向流,控制器需要从Kafka使用者API中知道操作的执行情况。
例如
- 要从数据库中获取所有商品列表,请在消费者级别执行
- 在消费者级别添加/更新/删除操作
我在消费者级别(监听器)具有下面的Micronaut反应代码,以便从mongo数据库中获取产品列表
@Topic(ProductTopicConstants.GET_FREE_TEXT_SEARCH)
public Flowable<Product> findByFreeText(String text) {
LOG.info(String.format("Listener --> Listening value = %s",text));
return Flowable.fromPublisher(repository.getCollection("product",Product.class)
.find(new Document("$text",new Document("$search",text)
.append("$caseSensitive",false)
.append("$diacriticSensitive",false)
)));
}
生产者界面
@KafkaClient
public interface IProductProducer {
@Topic(ProductTopicConstants.GET_FREE_TEXT_SEARCH)
Flowable<Product> findFreeText(String text);
}
生产者的实现
@Override
public Flowable<ProductViewModel> findFreeText(String text) {
LOG.info("Manager --> Finding all the products");
List<ProductViewModel> model = new ArrayList<>();
iProductProducer.findFreeText(text).subscribe(item -> {
System.out.println(item);
},error ->{
System.out.println(error);
});
return Flowable.just(new ProductViewModel());
}
根据Micronaut文档https://micronaut-projects.github.io/micronaut-kafka/latest/guide/#kafkaListenerMethods
根据Micronaut文档接收和返回反应类型
@Topic("reactive-products")
public Single<Product> receive(
@KafkaKey String brand,Single<Product> productFlowable) {
return productFlowable.doOnSuccess((product) ->
System.out.println("Got Product - " + product.getName() + " by " + brand)
);
}
在生产者实现中,订阅时我从未收到任何值。
iProductProducer.findFreeText(text).subscribe(item -> {
System.out.println(item);
},error ->{
System.out.println(error);
});
我想知道以上流程是否是使用kafka执行REST API CRUD操作的正确方法?
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。