微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何为不同的 Apache Camel 路由设置几个不同的 WebFlux 客户端属性?

如何解决如何为不同的 Apache Camel 路由设置几个不同的 WebFlux 客户端属性?

在路由设置中,我们在声明路由之前调用了 WebClient.build():

@Override
  public void configure() {
    createSubscription(activeProfile.equalsIgnoreCase("RESTART"));
    from(String.format("reactive-streams:%s",streamName))
        .to("log:camel.proxy?level=INFO&groupInterval=500000")
        .to(String.format("kafka:%s?brokers=%s",kafkaTopic,kafkabrokerUrls));
  }

  private void createSubscription(boolean restart) {
    WebClient.builder()
        .defaultHeader(HttpHeaders.CONTENT_TYPE,MediaType.TEXT_XML_VALUE)
        .build()
        .post()
        .uri(initialRequestUri)
        .body(BodyInserters.fromObject(restart ? String.format(restartRequestBody,zoneddatetime.Now(ZoneId.of("UTC")).toString().replace("[UTC]","")) : initialRequestBody))
        .retrieve()
        .bodyToMono(String.class)
        .map(initResp ->
            new JSONObject(initResp)
                .getJSONObject("RESPONSE")
                .getJSONArray("RESULT")
                .getJSONObject(0)
                .getJSONObject("INFO")
                .getString("SSEURL")
        )
        .flatMapMany(url -> {
          log.info(url);
          return WebClient.create()
              .get()
              .uri(url)
              .retrieve()
              .bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {
              })
              .flatMap(sse -> {
                    val data = new JSONObject(sse.data())
                        .getJSONObject("RESPONSE")
                        .getJSONArray("RESULT")
                        .getJSONObject(0)
                        .getJSONArray(apiName);
                    val list = new ArrayList<String>();
                    for (int i = 0; i < data.length(); i++) {
                      list.add(data.getJSONObject(i).toString());
                    }
                    return Flux.fromIterable(list);
                  }
              );
            }
        )
        .onBackpressureBuffer()
        .flatMap(msg -> camelReactiveStreamsService.toStream(streamName,msg,String.class))
        .doFirst(() -> log.info(String.format("Reactive stream %s was %s",streamName,restart ? "restarted" : "started")))
        .doOnError(err -> {
          log.error(String.format("Reactive stream %s has terminated with error,restarting",streamName),err);
          createSubscription(true);
        })
        .doOnComplete(() -> {
          log.warn(String.format("Reactive stream %s has completed,streamName));
          createSubscription(true);
        })
        .subscribe();
  }

据我所知,WebClient 设置是为整个 Spring Boot 应用程序设置的,而不是 Apache Camel 的特定路由(它不会以某种方式弯曲到特定的路由 id 或 url),这就是为什么新路由使用新的其他 url 的反应性流和其他带有标题/初始消息的需求也会得到这个设置,什么是不需要的。

那么,这里的问题是,是否可以进行特定的 WebClient 设置,而不是与整个应用程序相关联,而是与特定路由相关联并使其应用于该路由?

Spring DSL 可以进行这种配置吗?

解决方法

在那里应用的方式相当复杂:

  1. 创建 2 个路由,第一个首先执行并且只执行一次并且触发特定 bean 的特定方法,通过方法参数传递 WebClient.builder() 的设置并执行对 WebFlux 的订阅.是的,反应流设置是在 Spring Boot 应用程序的 Spring 上下文中完成的,而不是在 Apache Camel 上下文中完成。因此它与路由没有直接关联,而不是在特定路由启动时被调用。所以路线看起来像:

     <?xml version="1.0" encoding="UTF-8"?>
    
  1. 提供豆子。我已经把它放到 Spring Boot 应用程序中,而不是像下面这样的 Apache Camel 上下文。这里的缺点是,无论特定路线是否有效,我都必须将其放在这里。所以它一直在记忆中。

     import org.apache.camel.CamelContext;
     import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
     import org.json.JSONArray;
     import org.json.JSONObject;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     import org.springframework.core.ParameterizedTypeReference;
     import org.springframework.http.HttpHeaders;
     import org.springframework.http.MediaType;
     import org.springframework.http.codec.ServerSentEvent;
     import org.springframework.stereotype.Component;
     import org.springframework.web.reactive.function.BodyInserters;
     import org.springframework.web.reactive.function.client.WebClient;
     import reactor.core.publisher.Flux;    
     import java.time.ZoneId;
     import java.time.ZonedDateTime;
     import java.util.ArrayList;
    
     @Component
     public class WebFluxSetUp {
         private final Logger logger = LoggerFactory.getLogger(WebFluxSetUp.class);
         private final CamelContext camelContext;
         private final CamelReactiveStreamsService camelReactiveStreamsService;
    
         WebFluxSetUp(CamelContext camelContext,CamelReactiveStreamsService camelReactiveStreamsService) {
             this.camelContext = camelContext;
             this.camelReactiveStreamsService = camelReactiveStreamsService;
         }
    
         public void executeWebfluxSetup(boolean restart,String initialRequestUri,String restartRequestBody,String initialRequestBody,String apiName,String streamName) {
             {
                 WebClient.builder().defaultHeader(HttpHeaders.CONTENT_TYPE,MediaType.TEXT_XML_VALUE).build().post().uri(initialRequestUri).body(BodyInserters.fromObject(restart ? String.format(restartRequestBody,ZonedDateTime.now(ZoneId.of("UTC")).toString().replace("[UTC]","")) : initialRequestBody)).retrieve().bodyToMono(String.class).map(initResp -> new JSONObject(initResp).getJSONObject("RESPONSE").getJSONArray("RESULT").getJSONObject(0).getJSONObject("INFO").getString("SSEURL")).flatMapMany(url -> {
                     logger.info(url);
                     return WebClient.create().get().uri(url).retrieve().bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {
                     }).flatMap(sse -> {
                         JSONArray data = new JSONObject(sse.data()).getJSONObject("RESPONSE").getJSONArray("RESULT").getJSONObject(0).getJSONArray(apiName);
                         ArrayList<String> list = new ArrayList<String>();
                         for (int i = 0; i < data.length(); i++) {
                             list.add(data.getJSONObject(i).toString());
                         }
                         return Flux.fromIterable(list);
                     });
                 }).onBackpressureBuffer().flatMap(msg -> camelReactiveStreamsService.toStream(streamName,msg,String.class)).doFirst(() -> logger.info(String.format("Reactive stream %s was %s",streamName,restart ? "restarted" : "started"))).doOnError(err -> {
                     logger.error(String.format("Reactive stream %s has terminated with error,restarting",streamName),err);
                     executeWebfluxSetup(true,initialRequestUri,restartRequestBody,initialRequestBody,apiName,streamName);
                 }).doOnComplete(() -> {
                     logger.warn(String.format("Reactive stream %s has completed,streamName));
                     executeWebfluxSetup(true,streamName);
                 }).subscribe();
             }
         }
     }
    
  2. 其他缺点是当路由停止时,WebFlux 客户端仍在尝试向反应流 url 发送垃圾邮件。并且没有与路由相关的 api/事件处理程序来阻止它并使未编码到特定路由。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。