微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Spring SFTP 出站适配器 - 确定文件何时发送

如何解决Spring SFTP 出站适配器 - 确定文件何时发送

我有一个 Spring SFTP 输出适配器,我通过主程序中的“adapter.start()”启动它。启动后,适配器会按预期传输并上传指定目录中的所有文件。但我想在所有文件传输完毕后停止适配器。如何检测是否所有文件都已传输,以便我可以发出 adapter.stop()?

@Bean
public IntegrationFlow sftpOutboundFlow() {
    return IntegrationFlows.from(Files.inboundAdapter(new File(sftpOutboundDirectory))
                    .filterExpression("name.endsWith('.pdf') OR name.endsWith('.PDF')")
                    .preventDuplicates(true),e -> e.id("sftpOutboundAdapter")
                    .autoStartup(false)
                    .poller(Pollers.trigger(new FireOnceTrigger())
                            .maxMessagesPerPoll(-1)))
            .log(LoggingHandler.Level.INFO,"sftp.outbound",m -> m.getPayload())
            .log(LoggingHandler.Level.INFO,m -> m.getHeaders())
            .handle(Sftp.outboundAdapter(outboundSftpSessionFactory())
                    .useTemporaryFileName(false)
                    .remoteDirectory(sftpRemoteDirectory))
            .get();
}

解决方法

但我想在所有文件传输完毕后停止适配器。

从逻辑上讲,这不是为这种组件设计的。由于您不会有一些不断变化的本地目录,因此最好考虑通过某些操作列出目录中的文件的偶数驱动程序解决方案。是的,它可以是来自 main 的调用,但对于目录的所有内容只能调用一次,仅此而已。

因此,带有 Sftp.outboundGateway()Command.MPUT 适合您:

https://docs.spring.io/spring-integration/reference/html/sftp.html#using-the-mput-command

您仍然可以触发 IntegrationFlow,但它可以从 @MessagingGateway 接口开始,从具有本地目录的 main 调用以列出要上传的文件:

https://docs.spring.io/spring-integration/reference/html/dsl.html#java-dsl-gateway

,

@Artem Bilan 已经给出了答案。但这是他所说的具体实现 - 对于像我这样的 Spring Integration 菜鸟:

  1. 定义一个服务来按需获取 PDF 文件:
@Service
public class MyFileService {
    public List<File> getPdfFiles(final String srcDir) {
        File[] files = new File(srcDir).listFiles((dir,name) -> name.toLowerCase().endsWith(".pdf"));
        return Arrays.asList(files == null ? new File[]{} : files);
    }
}
  1. 定义网关以按需启动 SFTP 上传流程:
@MessagingGateway
public interface SFtpOutboundGateway {
    @Gateway(requestChannel = "sftpOutboundFlow.input")
    void uploadFiles(List<File> files);
}
  1. 定义集成流程以通过 Sftp.outboundGateway 将文件上传到 SFTP 服务器:
@Configuration
@EnableIntegration
public class FtpFlowIntegrationConfig {
    // could be also bound via @Value 
    private String sftpRemoteDirectory = "/path/to/remote/dir";

    @Bean
    public SessionFactory<ChannelSftp.LsEntry> outboundSftpSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost("localhost");
        factory.setPort(22222);
        factory.setUser("client1");
        factory.setPassword("password123");
        factory.setAllowUnknownKeys(true);
        return new CachingSessionFactory<>(factory);
    }

    @Bean
    public IntegrationFlow sftpOutboundFlow(RemoteFileTemplate<ChannelSftp.LsEntry> remoteFileTemplate) {
        return e -> e
                .log(LoggingHandler.Level.INFO,"sftp.outbound",Message::getPayload)
                .log(LoggingHandler.Level.INFO,Message::getHeaders)
                .handle(
                    Sftp.outboundGateway(remoteFileTemplate,AbstractRemoteFileOutboundGateway.Command.MPUT,"payload")
                );
    }

    @Bean
    public RemoteFileTemplate<ChannelSftp.LsEntry> remoteFileTemplate(SessionFactory<ChannelSftp.LsEntry> outboundSftpSessionFactory) {
        RemoteFileTemplate<ChannelSftp.LsEntry> template = new SftpRemoteFileTemplate(outboundSftpSessionFactory);
        template.setRemoteDirectoryExpression(new LiteralExpression(sftpRemoteDirectory));
        template.setAutoCreateDirectory(true);
        template.afterPropertiesSet();
        template.setUseTemporaryFileName(false);
        return template;
    }
}

接线:

public class SpringApp {
    public static void main(String[] args) {
        final MyFileService fileService = ctx.getBean(MyFileService.class);
        final SFtpOutboundGateway sFtpOutboundGateway = ctx.getBean(SFtpOutboundGateway.class);
        // trigger the sftp upload flow manually - only once
        sFtpOutboundGateway.uploadFiles(fileService.getPdfFiles()); 
    }
}

导入注释:

1.

@Gateway(requestChannel = "sftpOutboundFlow.input") void uploadFiles(List files);

此处 DirectChannel 通道 sftpOutboundFlow.input 将用于将带有有效载荷 (= List<File> files) 的消息传递给接收器。如果尚未创建此通道,网关将隐式创建它。

2.

@豆 公共集成流 sftpOutboundFlow(RemoteFileTemplate remoteFileTemplate) { ... }

由于 IntegrationFlow 是一个 Consumer 函数式接口,我们可以使用 IntegrationFlowDefinition 稍微简化流程。在 bean 注册阶段,IntegrationFlowBeanPostProcessor 将此内联 (Lambda) IntegrationFlow 转换为 StandardIntegrationFlow 并处理其组件。使用 Lambda 的 IntegrationFlow 定义将 DirectChannel 作为流的 inputChannel 填充,并在应用程序上下文中注册为上面示例中名称为 sftpOutboundFlow.input 的 bean(流 bean 名称 +“.input”)。这就是我们为 SFtpOutboundGateway 网关使用该名称的原因。

参考:https://spring.io/blog/2014/11/25/spring-integration-java-dsl-line-by-line-tutorial

3.

@豆 public RemoteFileTemplate remoteFileTemplate(SessionFactory outboundSftpSessionFactory) {}

见:Remote directory for sftp outbound gateway with DSL

流程图:

enter image description here

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?