如何解决如何为不同的 Apache Camel 路由设置几个不同的 WebFlux 客户端属性?
在路由设置中,我们在声明路由之前调用了 WebClient.build():
@Override
public void configure() {
createSubscription(activeProfile.equalsIgnoreCase("RESTART"));
from(String.format("reactive-streams:%s",streamName))
.to("log:camel.proxy?level=INFO&groupInterval=500000")
.to(String.format("kafka:%s?brokers=%s",kafkaTopic,kafkabrokerUrls));
}
private void createSubscription(boolean restart) {
WebClient.builder()
.defaultHeader(HttpHeaders.CONTENT_TYPE,MediaType.TEXT_XML_VALUE)
.build()
.post()
.uri(initialRequestUri)
.body(BodyInserters.fromObject(restart ? String.format(restartRequestBody,zoneddatetime.Now(ZoneId.of("UTC")).toString().replace("[UTC]","")) : initialRequestBody))
.retrieve()
.bodyToMono(String.class)
.map(initResp ->
new JSONObject(initResp)
.getJSONObject("RESPONSE")
.getJSONArray("RESULT")
.getJSONObject(0)
.getJSONObject("INFO")
.getString("SSEURL")
)
.flatMapMany(url -> {
log.info(url);
return WebClient.create()
.get()
.uri(url)
.retrieve()
.bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {
})
.flatMap(sse -> {
val data = new JSONObject(sse.data())
.getJSONObject("RESPONSE")
.getJSONArray("RESULT")
.getJSONObject(0)
.getJSONArray(apiName);
val list = new ArrayList<String>();
for (int i = 0; i < data.length(); i++) {
list.add(data.getJSONObject(i).toString());
}
return Flux.fromIterable(list);
}
);
}
)
.onBackpressureBuffer()
.flatMap(msg -> camelReactiveStreamsService.toStream(streamName,msg,String.class))
.doFirst(() -> log.info(String.format("Reactive stream %s was %s",streamName,restart ? "restarted" : "started")))
.doOnError(err -> {
log.error(String.format("Reactive stream %s has terminated with error,restarting",streamName),err);
createSubscription(true);
})
.doOnComplete(() -> {
log.warn(String.format("Reactive stream %s has completed,streamName));
createSubscription(true);
})
.subscribe();
}
据我所知,WebClient 设置是为整个 Spring Boot 应用程序设置的,而不是 Apache Camel 的特定路由(它不会以某种方式弯曲到特定的路由 id 或 url),这就是为什么新路由使用新的其他 url 的反应性流和其他带有标题/初始消息的需求也会得到这个设置,什么是不需要的。
那么,这里的问题是,是否可以进行特定的 WebClient 设置,而不是与整个应用程序相关联,而是与特定路由相关联并使其应用于该路由?
Spring DSL 可以进行这种配置吗?
解决方法
在那里应用的方式相当复杂:
-
创建 2 个路由,第一个首先执行并且只执行一次并且触发特定 bean 的特定方法,通过方法参数传递
WebClient.builder()
的设置并执行对 WebFlux 的订阅.是的,反应流设置是在 Spring Boot 应用程序的 Spring 上下文中完成的,而不是在 Apache Camel 上下文中完成。因此它与路由没有直接关联,而不是在特定路由启动时被调用。所以路线看起来像:<?xml version="1.0" encoding="UTF-8"?>
-
提供豆子。我已经把它放到 Spring Boot 应用程序中,而不是像下面这样的 Apache Camel 上下文。这里的缺点是,无论特定路线是否有效,我都必须将其放在这里。所以它一直在记忆中。
import org.apache.camel.CamelContext; import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService; import org.json.JSONArray; import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; import org.springframework.http.codec.ServerSentEvent; import org.springframework.stereotype.Component; import org.springframework.web.reactive.function.BodyInserters; import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Flux; import java.time.ZoneId; import java.time.ZonedDateTime; import java.util.ArrayList; @Component public class WebFluxSetUp { private final Logger logger = LoggerFactory.getLogger(WebFluxSetUp.class); private final CamelContext camelContext; private final CamelReactiveStreamsService camelReactiveStreamsService; WebFluxSetUp(CamelContext camelContext,CamelReactiveStreamsService camelReactiveStreamsService) { this.camelContext = camelContext; this.camelReactiveStreamsService = camelReactiveStreamsService; } public void executeWebfluxSetup(boolean restart,String initialRequestUri,String restartRequestBody,String initialRequestBody,String apiName,String streamName) { { WebClient.builder().defaultHeader(HttpHeaders.CONTENT_TYPE,MediaType.TEXT_XML_VALUE).build().post().uri(initialRequestUri).body(BodyInserters.fromObject(restart ? String.format(restartRequestBody,ZonedDateTime.now(ZoneId.of("UTC")).toString().replace("[UTC]","")) : initialRequestBody)).retrieve().bodyToMono(String.class).map(initResp -> new JSONObject(initResp).getJSONObject("RESPONSE").getJSONArray("RESULT").getJSONObject(0).getJSONObject("INFO").getString("SSEURL")).flatMapMany(url -> { logger.info(url); return WebClient.create().get().uri(url).retrieve().bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() { }).flatMap(sse -> { JSONArray data = new JSONObject(sse.data()).getJSONObject("RESPONSE").getJSONArray("RESULT").getJSONObject(0).getJSONArray(apiName); ArrayList<String> list = new ArrayList<String>(); for (int i = 0; i < data.length(); i++) { list.add(data.getJSONObject(i).toString()); } return Flux.fromIterable(list); }); }).onBackpressureBuffer().flatMap(msg -> camelReactiveStreamsService.toStream(streamName,msg,String.class)).doFirst(() -> logger.info(String.format("Reactive stream %s was %s",streamName,restart ? "restarted" : "started"))).doOnError(err -> { logger.error(String.format("Reactive stream %s has terminated with error,restarting",streamName),err); executeWebfluxSetup(true,initialRequestUri,restartRequestBody,initialRequestBody,apiName,streamName); }).doOnComplete(() -> { logger.warn(String.format("Reactive stream %s has completed,streamName)); executeWebfluxSetup(true,streamName); }).subscribe(); } } }
-
其他缺点是当路由停止时,WebFlux 客户端仍在尝试向反应流 url 发送垃圾邮件。并且没有与路由相关的 api/事件处理程序来阻止它并使未编码到特定路由。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。