微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

出站适配器中出现错误后,Spring DSL Integration Flow发布消息

如何解决出站适配器中出现错误后,Spring DSL Integration Flow发布消息

我有一个集成流程。

IntegrationFlows.from(
            Sftp.inboundAdapter(inboundSftp)
            .localDirectory(this.getlocalDirectory(config.getId()))
            .deleteRemoteFiles(true)
            .autocreateLocalDirectory(true)
            .filter(new CompositeFileListFilter().addFilter(new LastModifiedLsEntryFileListFilter(config.getRegexFilter())))
            .remoteDirectory(config.getInboundDirectory()),e -> e.poller(
                    Pollers.cron(config.getCron())
                    .errorChannel(MessageHeaders.ERROR_CHANNEL).errorHandler((ex) -> {
                        try {

                            this.destroy(String.valueOf(config.getId()));
                            configurationService.removeConfigurationChannelById(config.getId());
                            //loggin here

                            }
                        } catch (Exception ex1) {
                            Logger.getLogger(ExceptionAspect.class.getName()).log(Level.SEVERE,null,ex1);
                        }
                    })
                    .advice(startup.scanRemoteDirectory())
            ))
            .transform(
                file -> util.transform((File) file,config.getSourceEncoding(),config.getTargetEncoding(),doEncoding,doZip))
            .publishSubscribeChannel(s -> s
                .subscribe(f -> {
                    f.handle(Sftp.outboundAdapter(outboundSftp)
                        .useTemporaryFileName(false)
                        .autocreateDirectory(true)
                        .remoteDirectory(config.getoutboundDirectory()),c -> c.advice(startup.deleteFileAdvice()));

                })
                .subscribe(f -> {

                    if(doArchive) {
                        f.handle(Sftp.outboundAdapter(inboundSftp)
                            .useTemporaryFileName(false)
                            .autocreateDirectory(true)
                            .remoteDirectory(config.getInboundArchiveDirectory()));
                    } else {
                        f.handle(m -> {});
                    }

                })
                .subscribe(f -> f
                    .handle(m -> {

                        // file transfer logging here
                        if(doArchive) {
                          // file archived logging here
                         }
                        }
                    })
                )
            )
            .get();

在第一个订户中,如果文件上传失败,它仍在打印日志,表明该文件已传输,但实际上不是。

我理解的消息将传递给每个订阅者,当第一个订阅者完成其工作时,它将发送给下一个订阅者。

在我的情况下,第一个订阅者实际上无法上传文件。建议不删除文件

我确实做了一些尝试。

IntegrationFlows.from(
                Sftp.inboundAdapter(inboundSftp)
               ..........
                ))
                .transform(....
                   )
                .publishSubscribeChannel(s -> s
                    .subscribe(f -> {
                        f.handle(Sftp.outboundAdapter(outboundSftp)
                            .useTemporaryFileName(false)
                            .autocreateDirectory(true)
                            .remoteDirectory(config.getoutboundDirectory()),c -> c.advice(startup.deleteFileAdvice()));

                    }).publishSubcribeChannel(s -> s
                      .subscribe(f -> {

                        if(doArchive) {
                            f.handle(Sftp.outboundAdapter(inboundSftp)
                                .useTemporaryFileName(false)
                                .autocreateDirectory(true)
                                .remoteDirectory(config.getInboundArchiveDirectory()));
                        } else {
                            f.handle(m -> {});
                        }

                    })
                    .subscribe(f -> f
                        .handle(m -> {

                            // file transfer logging here
                            if(doArchive) {
                              // file archived logging here
                             }
                            }
                        })
                    )
                    )
                )
                .get();

我尝试在outboundAdapter订户之后再次发布,但是错误地它仍然发布。

还尝试在outboundAdapter订户中添加.errorChannel,对我不起作用。

      .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__,| / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.2.2.RELEASE)

2020-08-20 11:10:53,265 INFO c.t.i.s.I2SftpApplication - Starting I2SftpApplication on GSEUC5CG8393GR5 with PID 21636 (C:\Users\MuhammadUmair\IdeaProjects\i2sftpinboudservice\target\classes started by MuhammadUmair in C:\Users\MuhammadUmair\IdeaProjects\i2sftpinboudservice)
2020-08-20 11:10:53,269 INFO c.t.i.s.I2SftpApplication - No active profile set,falling back to default profiles: default
2020-08-20 11:10:54,464 INFO o.s.i.c.DefaultConfiguringbeanfactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore,a default PublishSubscribeChannel will be created.
2020-08-20 11:10:54,478 INFO o.s.i.c.DefaultConfiguringbeanfactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore,a default DefaultHeaderChannelRegistry will be created.
2020-08-20 11:10:54,577 INFO o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.beanfactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-08-20 11:10:54,584 INFO o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationdisposableautocreatedBeans' of type [org.springframework.integration.config.annotation.disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-08-20 11:10:54,597 INFO o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'org.springframework.integration.config.IntegrationManagementConfiguration' of type [org.springframework.integration.config.IntegrationManagementConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-08-20 11:10:54,906 INFO o.s.b.w.e.t.TomcatWebServer - Tomcat initialized with port(s): 8080 (http)
2020-08-20 11:10:54,917 INFO o.a.c.h.Http11NioProtocol - Initializing ProtocolHandler ["http-nio-8080"]
2020-08-20 11:10:54,917 INFO o.a.c.c.StandardService - Starting service [Tomcat]
2020-08-20 11:10:54,917 INFO o.a.c.c.StandardEngine - Starting Servlet engine: [Apache Tomcat/9.0.29]
2020-08-20 11:10:55,095 INFO o.a.c.c.C.[.[.[/] - Initializing Spring embedded WebApplicationContext
2020-08-20 11:10:55,095 INFO o.s.w.c.ContextLoader - Root WebApplicationContext: initialization completed in 1705 ms
2020-08-20 11:10:56,920 INFO o.s.s.c.ThreadPoolTaskExecutor - Initializing ExecutorService 'applicationTaskExecutor'
2020-08-20 11:10:57,243 INFO o.s.s.c.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler'
2020-08-20 11:10:57,415 INFO o.s.i.e.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2020-08-20 11:10:57,416 INFO o.s.i.c.PublishSubscribeChannel - Channel 'application.errorChannel' has 1 subscriber(s).
2020-08-20 11:10:57,416 INFO o.s.i.e.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger'
2020-08-20 11:10:57,416 INFO o.s.i.e.EventDrivenConsumer - Adding {message-handler:startup.resultFileHandler.serviceActivator} as a subscriber to the 'fromSftpChannel' channel
2020-08-20 11:10:57,416 INFO o.s.i.c.DirectChannel - Channel 'application.fromSftpChannel' has 1 subscriber(s).
2020-08-20 11:10:57,417 INFO o.s.i.e.EventDrivenConsumer - started bean 'startup.resultFileHandler.serviceActivator'
2020-08-20 11:10:57,422 INFO o.s.i.e.sourcePollingChannelAdapter - started bean 'startup.sftpMessageSource.inboundChannelAdapter'
2020-08-20 11:10:57,441 INFO o.a.c.h.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8080"]
2020-08-20 11:10:57,498 INFO o.s.b.w.e.t.TomcatWebServer - Tomcat started on port(s): 8080 (http) with context path ''
2020-08-20 11:10:57,508 INFO c.t.i.s.I2SftpApplication - Started I2SftpApplication in 9.228 seconds (JVM running for 13.133)
Registering an Integration Flow with id : 1
2020-08-20 11:10:57,689 INFO o.s.i.c.DirectChannel - Channel 'application.1.subFlow#2.channel#0' has 1 subscriber(s).
2020-08-20 11:10:57,690 INFO o.s.i.e.EventDrivenConsumer - started bean '1.subFlow#2.org.springframework.integration.config.ConsumerEndpointfactorybean#1'
2020-08-20 11:10:57,691 INFO o.s.i.e.EventDrivenConsumer - Adding {bridge} as a subscriber to the '1.subFlow#0.channel#0' channel
2020-08-20 11:10:57,691 INFO o.s.i.c.PublishSubscribeChannel - Channel 'application.1.subFlow#0.channel#0' has 1 subscriber(s).
2020-08-20 11:10:57,691 INFO o.s.i.e.EventDrivenConsumer - started bean '1.subFlow#2.org.springframework.integration.config.ConsumerEndpointfactorybean#0'
2020-08-20 11:10:57,691 INFO o.s.i.c.DirectChannel - Channel 'application.1.subFlow#1.channel#0' has 1 subscriber(s).
2020-08-20 11:10:57,691 INFO o.s.i.e.EventDrivenConsumer - started bean '1.subFlow#1.org.springframework.integration.config.ConsumerEndpointfactorybean#1'
2020-08-20 11:10:57,692 INFO o.s.i.c.PublishSubscribeChannel - Channel 'application.1.subFlow#0.channel#0' has 2 subscriber(s).
2020-08-20 11:10:57,692 INFO o.s.i.e.EventDrivenConsumer - started bean '1.subFlow#1.org.springframework.integration.config.ConsumerEndpointfactorybean#0'
2020-08-20 11:10:57,693 INFO o.s.i.e.EventDrivenConsumer - Adding {message-handler} as a subscriber to the '1.subFlow#0.channel#1' channel
2020-08-20 11:10:57,694 INFO o.s.i.c.DirectChannel - Channel 'application.1.subFlow#0.channel#1' has 1 subscriber(s).
2020-08-20 11:10:57,694 INFO o.s.i.e.EventDrivenConsumer - started bean '1.subFlow#0.org.springframework.integration.config.ConsumerEndpointfactorybean#1'
2020-08-20 11:10:57,694 INFO o.s.i.e.EventDrivenConsumer - Adding {bridge} as a subscriber to the '1.subFlow#0.channel#0' channel
2020-08-20 11:10:57,694 INFO o.s.i.e.EventDrivenConsumer - started bean '1.subFlow#0.org.springframework.integration.config.ConsumerEndpointfactorybean#0'
2020-08-20 11:10:57,694 INFO o.s.i.e.EventDrivenConsumer - Adding {transformer} as a subscriber to the '1.channel#0' channel
2020-08-20 11:10:57,694 INFO o.s.i.c.DirectChannel - Channel 'application.1.channel#0' has 1 subscriber(s).
2020-08-20 11:10:57,694 INFO o.s.i.e.EventDrivenConsumer - started bean '1.org.springframework.integration.config.ConsumerEndpointfactorybean#0'
2020-08-20 11:10:57,700 INFO o.s.i.e.sourcePollingChannelAdapter - started bean '1.org.springframework.integration.config.sourcePollingChannelAdapterfactorybean#0'
2020-08-20 11:10:58,017 INFO c.j.jsch - Connecting to *.*.*.114 port 22
2020-08-20 11:10:58,341 INFO c.j.jsch - Connection established
2020-08-20 11:10:58,672 INFO c.j.jsch - Remote version string: SSH-2.0-OpenSSH_7.4
2020-08-20 11:10:58,673 INFO c.j.jsch - Local version string: SSH-2.0-JSCH-0.1.54
2020-08-20 11:10:58,674 INFO c.j.jsch - CheckCiphers: aes256-ctr,aes192-ctr,aes128-ctr,aes256-cbc,aes192-cbc,aes128-cbc,3des-ctr,arcfour,arcfour128,arcfour256
2020-08-20 11:10:58,728 INFO c.j.jsch - CheckKexes: diffie-hellman-group14-sha1,ecdh-sha2-nistp256,ecdh-sha2-nistp384,ecdh-sha2-nistp521
2020-08-20 11:10:58,815 INFO c.j.jsch - CheckSignatures: ecdsa-sha2-nistp256,ecdsa-sha2-nistp384,ecdsa-sha2-nistp521
2020-08-20 11:10:58,823 INFO c.j.jsch - SSH_MSG_KEXINIT sent
2020-08-20 11:10:59,036 INFO c.j.jsch - SSH_MSG_KEXINIT received
2020-08-20 11:10:59,038 INFO c.j.jsch - kex: server: curve25519-sha256,curve25519-sha256@libssh.org,ecdh-sha2-nistp521,diffie-hellman-group-exchange-sha256,diffie-hellman-group16-sha512,diffie-hellman-group18-sha512,diffie-hellman-group-exchange-sha1,diffie-hellman-group14-sha256,diffie-hellman-group14-sha1,diffie-hellman-group1-sha1
2020-08-20 11:10:59,038 INFO c.j.jsch - kex: server: ssh-rsa,rsa-sha2-512,rsa-sha2-256,ecdsa-sha2-nistp256,ssh-ed25519
2020-08-20 11:10:59,039 INFO c.j.jsch - kex: server: aes256-ctr,aes128-ctr
2020-08-20 11:10:59,040 INFO c.j.jsch - kex: server: aes256-ctr,040 INFO c.j.jsch - kex: server: hmac-sha2-512,hmac-sha2-256
2020-08-20 11:10:59,041 INFO c.j.jsch - kex: server: hmac-sha2-512,042 INFO c.j.jsch - kex: server: none,zlib@openssh.com
2020-08-20 11:10:59,042 INFO c.j.jsch - kex: server: 
2020-08-20 11:10:59,043 INFO c.j.jsch - kex: server: 
2020-08-20 11:10:59,043 INFO c.j.jsch - kex: client: ecdh-sha2-nistp256,044 INFO c.j.jsch - kex: client: ssh-rsa,ssh-dss,ecdsa-sha2-nistp521
2020-08-20 11:10:59,044 INFO c.j.jsch - kex: client: aes128-ctr,3des-cbc,blowfish-cbc,aes256-ctr,aes256-cbc
2020-08-20 11:10:59,045 INFO c.j.jsch - kex: client: aes128-ctr,045 INFO c.j.jsch - kex: client: hmac-md5,hmac-sha1,hmac-sha2-256,hmac-sha1-96,hmac-md5-96
2020-08-20 11:10:59,045 INFO c.j.jsch - kex: client: none
2020-08-20 11:10:59,045 INFO c.j.jsch - kex: client: 
2020-08-20 11:10:59,046 INFO c.j.jsch - kex: client: 
2020-08-20 11:10:59,046 INFO c.j.jsch - kex: server->client aes128-ctr hmac-sha2-256 none
2020-08-20 11:10:59,047 INFO c.j.jsch - kex: client->server aes128-ctr hmac-sha2-256 none
2020-08-20 11:10:59,053 INFO c.j.jsch - SSH_MSG_KEX_ECDH_INIT sent
2020-08-20 11:10:59,053 INFO c.j.jsch - expecting SSH_MSG_KEX_ECDH_REPLY
2020-08-20 11:10:59,385 INFO c.j.jsch - ssh_rsa_verify: signature true
2020-08-20 11:10:59,389 INFO o.s.i.s.s.DefaultSftpSessionFactory - The authenticity of host '*.*.*.114' can't be established.
RSA key fingerprint is 1f:3e:c9:5f:37:00:a1:00:ef:50:59:af:42:98:99:e9.
Are you sure you want to continue connecting?
2020-08-20 11:10:59,390 WARN c.j.jsch - Permanently added '*.*.*.114' (RSA) to the list of kNown hosts.
2020-08-20 11:10:59,390 INFO c.j.jsch - SSH_MSG_NEWKEYS sent
2020-08-20 11:10:59,390 INFO c.j.jsch - SSH_MSG_NEWKEYS received
2020-08-20 11:10:59,395 INFO c.j.jsch - SSH_MSG_SERVICE_REQUEST sent
2020-08-20 11:10:59,718 INFO c.j.jsch - SSH_MSG_SERVICE_ACCEPT received
2020-08-20 11:11:00,047 INFO c.j.jsch - Authentications that can continue: gssapi-with-mic,publickey,keyboard-interactive,password
2020-08-20 11:11:00,047 INFO c.j.jsch - Next authentication method: gssapi-with-mic
2020-08-20 11:11:00,399 INFO c.j.jsch - Authentications that can continue: password
2020-08-20 11:11:00,399 INFO c.j.jsch - Next authentication method: password
2020-08-20 11:11:00,746 INFO c.j.jsch - Authentication succeeded (password).
Total Files: 1
2020-08-20 11:11:27,623 INFO c.t.i.s.s.LastModifiedLsEntryFileListFilter - [OB.xml] old size [null]  increased to [0]...
Total Files: 1
2020-08-20 11:11:29,636 INFO c.t.i.s.s.LastModifiedLsEntryFileListFilter - [OB.xml] old size [0]  increased to [38709]...
Total Files: 1
2020-08-20 11:11:34,256 INFO c.j.jsch - Connecting to *.*.*.115 port 22
2020-08-20 11:11:55,282 INFO c.t.i.c.f.a.LoggingAspect - log message Integration Name=CXML invoice DK Service Name=FileTransferService Source=External Source Interface=/home/umair/accruals/14minute/ Target=NA Target Interface=/home/umair/accruals/15minute/ Content ID=OB.xml Message ID=N/A Category=SUCCESS Timestamp=2020-08-20T11:11:55.281 Message=file OB.xml transferred

按照Artem Bilan的建议,我创建了一个演示应用程序,并上传到了git repo here

服务器配置为here

定义了集成流程here

自述文件file

解决方法

我发现了您的问题:

@Bean
public Advice deleteFileAdvice() {

    ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
    advice.setOnSuccessExpressionString(AppConstants.SUCCESS_EXPRESSION);
    advice.setTrapException(true);
    return advice;
}

查看其JavaDocs:

/**
 * If true,any exception will be caught and null returned.
 * Default false.
 * @param trapException true to trap Exceptions.
 */
public void setTrapException(boolean trapException) {

因此,消息处理程序中的任何异常(包括SFTP连接失败)都将被吞没。因此,请求消息实际上已移至您publishSubcribeChannel配置中的下一个订阅者。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?