微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

java – Spring集成 – AbstractInboundFileSynchronizer不更新文件

我会期望ftp同步机制更新一个更改的文件.但是,从这里可以看到,仅当文件不存在时才下载该文件.至于现在,即使时间戳记/内容已经更改,文件也不会在本地保存.

所以这是我到目前为止发现的

类org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer

@Override
    public void synchronizetoLocalDirectory(final File localDirectory) {
        final String remoteDirectory = this.remoteDirectoryExpression.getValue(this.evaluationContext,String.class);
        try {
            int transferred = this.remoteFileTemplate.execute(new SessionCallback<F,Integer>() {

                @Override
                public Integer doInSession(Session<F> session) throws IOException {
                    F[] files = session.list(remoteDirectory);
                    if (!ObjectUtils.isEmpty(files)) {
                        List<F> filteredFiles = filterFiles(files);
                        for (F file : filteredFiles) {
                            try {
                                if (file != null) {
                                    copyFiletoLocalDirectory(
                                            remoteDirectory,file,localDirectory,session);
                                }
                            }
                            catch (RuntimeException e) {
                                if (AbstractInboundFileSynchronizer.this.filter instanceof reversibleFileListFilter) {
                                    ((reversibleFileListFilter<F>) AbstractInboundFileSynchronizer.this.filter)
                                            .rollback(file,filteredFiles);
                                }
                                throw e;
                            }
                            catch (IOException e) {
                                if (AbstractInboundFileSynchronizer.this.filter instanceof reversibleFileListFilter) {
                                    ((reversibleFileListFilter<F>) AbstractInboundFileSynchronizer.this.filter)
                                            .rollback(file,filteredFiles);
                                }
                                throw e;
                            }
                        }
                        return filteredFiles.size();
                    }
                    else {
                        return 0;
                    }
                }
            });
            if (this.logger.isDebugEnabled()) {
                this.logger.debug(transferred + " files transferred");
            }
        }
        catch (Exception e) {
            throw new MessagingException("Problem occurred while synchronizing remote to local directory",e);
        }
    }

过滤要下载的文件.我想使用org.springframework.integration.ftp.filters.FtpPersistentAcceptOnceFileListFilter,它比较文件名和最后修改的日期.

然后,它将使用已过滤的文件(待复制)调用copyFiletoLocalDirectory函数.

protected void copyFiletoLocalDirectory(String remoteDirectoryPath,F remoteFile,File localDirectory,Session<F> session) throws IOException {
        String remoteFileName = this.getFilename(remoteFile);
        String localFileName = this.generateLocalFileName(remoteFileName);
        String remoteFilePath = remoteDirectoryPath != null
                ? (remoteDirectoryPath + this.remoteFileSeparator + remoteFileName)
                : remoteFileName;
        if (!this.isFile(remoteFile)) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("cannot copy,not a file: " + remoteFilePath);
            }
            return;
        }

        File localFile = new File(localDirectory,localFileName);
        if (!localFile.exists()) {
            String tempFileName = localFile.getAbsolutePath() + this.temporaryFileSuffix;
            File tempFile = new File(tempFileName);
            OutputStream outputStream = new bufferedoutputstream(new FileOutputStream(tempFile));
            try {
                session.read(remoteFilePath,outputStream);
            }
            catch (Exception e) {
                if (e instanceof RuntimeException) {
                    throw (RuntimeException) e;
                }
                else {
                    throw new MessagingException("Failure occurred while copying from remote to local directory",e);
                }
            }
            finally {
                try {
                    outputStream.close();
                }
                catch (Exception ignored2) {
                }
            }

            if (tempFile.renameto(localFile)) {
                if (this.deleteRemoteFiles) {
                    session.remove(remoteFilePath);
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("deleted " + remoteFilePath);
                    }
                }
            }
            if (this.preserveTimestamp) {
                localFile.setLastModified(getModified(remoteFile));
            }
        }
    }

但是,如果该文件已经存在于本地磁盘上,则此方法将仅检查(仅基于文件名),如果该文件不存在则仅下载.所以基本上没有机会下载更新的文件(带有新的时间戳).

我玩的时候试图改变FtpInboundFileSynchronizer,但它变得太复杂了.什么是“定制”Synchron- / copyToLocalDirectory方法的最佳方式?

解决方法

可以更新AbstractInboundFileSynchronizer以识别更新的文件,但它很脆弱,您遇到其他问题.

更新13 / Nov / 2016:找出如何在几秒钟内获得修改时间戳.

更新AbstractInboundFileSynchronizer的主要问题是它具有setter方法,但没有(protected)getter方法.如果将来,setter方法做得很聪明,这里介绍的更新版本会中断.

在本地目录中更新文件的主要问题是并发性:如果正在接收更新的同时处理本地文件,则可能会遇到各种麻烦.简单的方法是将本地文件移动到(临时)处理目录,以便可以接收更新作为新文件,从而无需更新AbstractInboundFileSynchronizer.另见骆驼时间戳remarks.

认情况下,FTP服务器会在几分钟内提供修改时间戳.对于测试,我更新了FTP客户端以使用MLSD命令,该命令在几秒钟内提供修改时间戳(如果幸运的话,则为毫秒),但并不是所有的FTP服务器都支持这一点.

Spring FTP reference所述,本地文件过滤器需要是FileSystemPersistentAcceptOnceFileListFilter,以确保修改时间戳更改时本地文件被拾取.

在我更新的AbstractInboundFileSynchronizer的版本之下,然后是我使用的一些测试类.

public class FtpUpdatingFileSynchronizer extends FtpInboundFileSynchronizer {

    protected final Log logger = LogFactory.getLog(this.getClass());

    private volatile Expression localFilenameGeneratorExpression;
    private volatile EvaluationContext evaluationContext;
    private volatile boolean deleteRemoteFiles;
    private volatile String remoteFileSeparator = "/";
    private volatile boolean  preserveTimestamp;

    public FtpUpdatingFileSynchronizer(SessionFactory<FTPFile> sessionFactory) {
        super(sessionFactory);
        setPreserveTimestamp(true);
    }

    @Override
    public void setLocalFilenameGeneratorExpression(Expression localFilenameGeneratorExpression) {
        super.setLocalFilenameGeneratorExpression(localFilenameGeneratorExpression);
        this.localFilenameGeneratorExpression = localFilenameGeneratorExpression;
    }

    @Override
    public void setIntegrationEvaluationContext(EvaluationContext evaluationContext) {
        super.setIntegrationEvaluationContext(evaluationContext);
        this.evaluationContext = evaluationContext;
    }

    @Override
    public void setDeleteRemoteFiles(boolean deleteRemoteFiles) {
        super.setDeleteRemoteFiles(deleteRemoteFiles);
        this.deleteRemoteFiles = deleteRemoteFiles;
    }

    @Override
    public void setRemoteFileSeparator(String remoteFileSeparator) {
        super.setRemoteFileSeparator(remoteFileSeparator);
        this.remoteFileSeparator = remoteFileSeparator;
    }

    @Override
    public void setPreserveTimestamp(boolean preserveTimestamp) {
        // updated
        Assert.isTrue(preserveTimestamp,"for updating timestamps must be preserved");
        super.setPreserveTimestamp(preserveTimestamp);
        this.preserveTimestamp = preserveTimestamp;
    }

    @Override
    protected void copyFiletoLocalDirectory(String remoteDirectoryPath,FTPFile remoteFile,Session<FTPFile> session) throws IOException {

        String remoteFileName = this.getFilename(remoteFile);
        String localFileName = this.generateLocalFileName(remoteFileName);
        String remoteFilePath = (remoteDirectoryPath != null
                ? (remoteDirectoryPath + this.remoteFileSeparator + remoteFileName)
                        : remoteFileName);

        if (!this.isFile(remoteFile)) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("cannot copy,not a file: " + remoteFilePath);
            }
            return;
        }

        // start update
        File localFile = new File(localDirectory,localFileName);
        boolean update = false;
        if (localFile.exists()) {
            if (this.getModified(remoteFile) > localFile.lastModified()) {
                this.logger.info("Updating local file " + localFile);
                update = true;
            } else {
                this.logger.info("File already exists: " + localFile);
                return;
            }
        }
        // end update

        String tempFileName = localFile.getAbsolutePath() + this.getTemporaryFileSuffix();
        File tempFile = new File(tempFileName);
        OutputStream outputStream = new bufferedoutputstream(new FileOutputStream(tempFile));
        try {
            session.read(remoteFilePath,outputStream);
        } catch (Exception e) {
            if (e instanceof RuntimeException) {
                throw (RuntimeException) e;
            }
            else {
                throw new MessagingException("Failure occurred while copying from remote to local directory",e);
            }
        } finally {
            try {
                outputStream.close();
            }
            catch (Exception ignored2) {
            }
        }
        // updated
        if (update && !localFile.delete()) {
            throw new MessagingException("Unable to delete local file [" + localFile + "] for update.");
        }
        if (tempFile.renameto(localFile)) {
            if (this.deleteRemoteFiles) {
                session.remove(remoteFilePath);
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("deleted " + remoteFilePath);
                }
            }
            // updated
            this.logger.info("Stored file locally: " + localFile);
        } else {
            // updated
            throw new MessagingException("Unable to rename temporary file [" + tempFile + "] to [" + localFile + "]");
        }
        if (this.preserveTimestamp) {
            localFile.setLastModified(getModified(remoteFile));
        }
    }

    private String generateLocalFileName(String remoteFileName) {

        if (this.localFilenameGeneratorExpression != null) {
            return this.localFilenameGeneratorExpression.getValue(this.evaluationContext,remoteFileName,String.class);
        }
        return remoteFileName;
    }

}

遵循我使用的一些测试类.
我使用依赖关系org.springframework.integration:spring-integration-ftp:4.3.5.RELEASE和org.apache.ftpserver:ftpserver-core:1.0.6(加上通常的日志和测试依赖项).

public class TestFtpSync {

    static final Logger log = LoggerFactory.getLogger(TestFtpSync.class);
    static final String FTP_ROOT_DIR = "target" + File.separator + "ftproot";
    // org.apache.ftpserver:ftpserver-core:1.0.6
    static FtpServer server;

    @BeforeClass
    public static void startServer() throws FtpException {

        File ftpRoot = new File (FTP_ROOT_DIR);
        ftpRoot.mkdirs();
        TestUserManager userManager = new TestUserManager(ftpRoot.getAbsolutePath());
        FtpServerFactory serverFactory = new FtpServerFactory();
        serverFactory.setUserManager(userManager);
        ListenerFactory factory = new ListenerFactory();
        factory.setPort(4444);
        serverFactory.addListener("default",factory.createListener());
        server = serverFactory.createServer();
        server.start();
    }

    @AfterClass
    public static void stopServer() {

        if (server != null) {
            server.stop();
        }
    }

    File ftpFile = Paths.get(FTP_ROOT_DIR,"test1.txt").toFile();
    File ftpFile2 = Paths.get(FTP_ROOT_DIR,"test2.txt").toFile();

    @Test
    public void syncDir() {

        // org.springframework.integration:spring-integration-ftp:4.3.5.RELEASE
        AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext();
        try {
            ctx.register(FtpSyncConf.class);
            ctx.refresh();
            PollableChannel msgChannel = ctx.getBean("inputChannel",PollableChannel.class);
            for (int j = 0; j < 2; j++) {
                for (int i = 0; i < 2; i++) {
                    storeFtpFile();
                }
                for (int i = 0; i < 4; i++) {
                    fetchMessage(msgChannel);
                }
            }
        } catch (Exception e) {
            throw new AssertionError("FTP test Failed.",e);
        } finally {
            ctx.close();
            cleanup();
        }
    }

    boolean tswitch = true;

    void storeFtpFile() throws IOException,InterruptedException {

        File f = (tswitch ? ftpFile : ftpFile2);
        tswitch = !tswitch;
        log.info("Writing message " + f.getName());
        Files.write(f.toPath(),("Hello " + System.currentTimeMillis()).getBytes());
    }

    Message<?> fetchMessage(PollableChannel msgChannel) {

        log.info("Fetching message.");
        Message<?> msg = msgChannel.receive(1000L);
        if (msg == null) {
            log.info("No message.");
        } else {
            log.info("Have a message: " + msg);
        }
        return msg;
    }

    void cleanup() {

        delFile(ftpFile);
        delFile(ftpFile2);
        File d = new File(FtpSyncConf.LOCAL_DIR);
        if (d.isDirectory()) {
            for (File f : d.listFiles()) {
                delFile(f);
            }
        }
        log.info("Finished cleanup");
    }

    void delFile(File f) {

        if (f.isFile()) {
            if (f.delete()) {
                log.info("Deleted " + f);
            } else {
                log.error("Cannot delete file " + f);
            }
        }
    }

}

public class MlistFtpSessionFactory extends AbstractFtpSessionFactory<MlistFtpClient> {

    @Override
    protected MlistFtpClient createClientInstance() {
        return new MlistFtpClient();
    }

}

public class MlistFtpClient extends FTPClient {

    @Override
    public FTPFile[] listFiles(String pathname) throws IOException {
        return super.mlistDir(pathname);
    }
}

@EnableIntegration
@Configuration
public class FtpSyncConf {

    private static final Logger log = LoggerFactory.getLogger(FtpSyncConf.class);

    public static final String LOCAL_DIR = "/tmp/received";

    @Bean(name = "ftpMetaData")
    public ConcurrentMetadataStore ftpMetaData() {
        return new SimpleMetadataStore();
    }

    @Bean(name = "localMetaData")
    public ConcurrentMetadataStore localMetaData() {
        return new SimpleMetadataStore();
    }

    @Bean(name = "ftpFileSyncer")
    public FtpUpdatingFileSynchronizer ftpFileSyncer(
            @Qualifier("ftpMetaData") ConcurrentMetadataStore MetadataStore) {

        MlistFtpSessionFactory ftpSessionFactory = new MlistFtpSessionFactory();
        ftpSessionFactory.setHost("localhost");
        ftpSessionFactory.setPort(4444);
        ftpSessionFactory.setUsername("demo");
        ftpSessionFactory.setPassword("demo");

        FtpPersistentAcceptOnceFileListFilter fileFilter = new FtpPersistentAcceptOnceFileListFilter(MetadataStore,"ftp");
        fileFilter.setFlushOnUpdate(true);
        FtpUpdatingFileSynchronizer ftpFileSync = new FtpUpdatingFileSynchronizer(ftpSessionFactory);
        ftpFileSync.setFilter(fileFilter);
        // ftpFileSync.setDeleteRemoteFiles(true);
        return ftpFileSync;
    }

    @Bean(name = "syncFtp")
    @InboundChannelAdapter(value = "inputChannel",poller = @Poller(fixedDelay = "500",maxMessagesPerPoll = "1"))
    public MessageSource<File> syncChannel(
            @Qualifier("localMetaData") ConcurrentMetadataStore MetadataStore,@Qualifier("ftpFileSyncer") FtpUpdatingFileSynchronizer ftpFileSync) throws Exception {

        FtpInboundFileSynchronizingMessageSource messageSource = new FtpInboundFileSynchronizingMessageSource(ftpFileSync);
        File receiveDir = new File(LOCAL_DIR);
        receiveDir.mkdirs();
        messageSource.setLocalDirectory(receiveDir);
        messageSource.setLocalFilter(new FileSystemPersistentAcceptOnceFileListFilter(MetadataStore,"local"));
        log.info("Message source bean created.");
        return messageSource;
    }

    @Bean(name = "inputChannel")
    public PollableChannel inputChannel() {

        QueueChannel channel = new QueueChannel();
        log.info("Message channel bean created.");
        return channel;
    }

}

/**
 * copied from https://github.com/spring-projects/spring-integration-samples/tree/master/basic/ftp/src/test/java/org/springframework/integration/samples/ftp/support
 * @author Gunnar Hillert
 *
 */
public class TestUserManager extends AbstractUserManager {
    private BaseUser testUser;
    private BaseUser anonUser;

    private static final String TEST_USERNAME = "demo";
    private static final String TEST_PASSWORD = "demo";

    public TestUserManager(String homeDirectory) {
        super("admin",new ClearTextPasswordEncryptor());

        testUser = new BaseUser();
        testUser.setAuthorities(Arrays.asList(new Authority[] {new ConcurrentLoginPermission(1,1),new WritePermission()}));
        testUser.setEnabled(true);
        testUser.setHomeDirectory(homeDirectory);
        testUser.setMaxIdleTime(10000);
        testUser.setName(TEST_USERNAME);
        testUser.setPassword(TEST_PASSWORD);

        anonUser = new BaseUser(testUser);
        anonUser.setName("anonymous");
    }

    public User getUserByName(String username) throws FtpException {
        if(TEST_USERNAME.equals(username)) {
            return testUser;
        } else if(anonUser.getName().equals(username)) {
            return anonUser;
        }

        return null;
    }

    public String[] getAllUserNames() throws FtpException {
        return new String[] {TEST_USERNAME,anonUser.getName()};
    }

    public void delete(String username) throws FtpException {
        throw new UnsupportedOperationException("Deleting of FTP Users is not supported.");
    }

    public void save(User user) throws FtpException {
        throw new UnsupportedOperationException("Saving of FTP Users is not supported.");
    }

    public boolean doesExist(String username) throws FtpException {
        return (TEST_USERNAME.equals(username) || anonUser.getName().equals(username)) ? true : false;
    }

    public User authenticate(Authentication authentication) throws AuthenticationFailedException {
        if(UsernamePasswordAuthentication.class.isAssignableFrom(authentication.getClass())) {
            UsernamePasswordAuthentication upAuth = (UsernamePasswordAuthentication) authentication;

            if(TEST_USERNAME.equals(upAuth.getUsername()) && TEST_PASSWORD.equals(upAuth.getpassword())) {
                return testUser;
            }

            if(anonUser.getName().equals(upAuth.getUsername())) {
                return anonUser;
            }
        } else if(AnonymousAuthentication.class.isAssignableFrom(authentication.getClass())) {
            return anonUser;
        }

        return null;
    }
}

更新15 / Nov / 2016:xml配置注意事项.

通过spring-integration-ftp-4.3.5.RELEASE.jar!/ meta-inf / spring,通过FtpNamespaceHandler通过org.springframework.integration.ftp.config.FtpInboundChannelAdapterParser将xml-element入站通道适配器直接链接到FtpInboundFileSynchronizer .handlers.
遵循xml-custom参考指南,在本地meta-inf / spring.handlers文件中指定自定义FtpNamespaceHandler应允许您使用FtpUpdatingFileSynchronizer而不是FtpInboundFileSynchronizer.它并没有为我的单元测试工作,一个正确的解决方案可能涉及创建额外的/修改的xsd文件,以便常规入站通道适配器正在使用常规FtpInboundFileSynchronizer和一个特殊的入站更新通道适配器是使用FtpUpdatingFileSynchronizer.正确地做这个是有点超出了这个答案的范围.
一个快速的黑客可以让你开始.您可以通过在本地项目中创建包org.springframework.integration.ftp.config和类FtpNamespaceHandler来覆盖认的FtpNamespaceHandler.内容如下:

package org.springframework.integration.ftp.config;

public class FtpNamespaceHandler extends org.springframework.integration.config.xml.AbstractIntegrationNamespaceHandler {

    @Override
    public void init() {
        System.out.println("Initializing FTP updating file synchronizer.");
        // one updated line below,rest copied from original FtpNamespaceHandler
        registerBeanDeFinitionParser("inbound-channel-adapter",new MyFtpInboundChannelAdapterParser());
        registerBeanDeFinitionParser("inbound-streaming-channel-adapter",new FtpStreamingInboundChannelAdapterParser());
        registerBeanDeFinitionParser("outbound-channel-adapter",new FtpOutboundChannelAdapterParser());
        registerBeanDeFinitionParser("outbound-gateway",new FtpOutboundGatewayParser());
    }

}

package org.springframework.integration.ftp.config;

import org.springframework.integration.file.remote.synchronizer.InboundFileSynchronizer;
import org.springframework.integration.ftp.config.FtpInboundChannelAdapterParser;

public class MyFtpInboundChannelAdapterParser extends FtpInboundChannelAdapterParser {

    @Override
    protected Class<? extends InboundFileSynchronizer> getInboundFileSynchronizerClass() {
        System.out.println("Returning updating file synchronizer.");
        return FtpUpdatingFileSynchronizer.class;
    }

}

另外在xml文件添加preserve-timestamp =“true”,以防止新的IllegalArgumentException:必须保留更新时间戳.

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐