FTP 串流輸入通道配接器

版本 4.3 引入了串流輸入通道配接器。此配接器產生帶有 InputStream 類型有效負載的訊息,允許在不寫入本機檔案系統的情況下提取檔案。由於工作階段保持開啟狀態,因此使用應用程式負責在檔案被使用後關閉工作階段。工作階段在 closeableResource 標頭 (IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE) 中提供。標準框架元件,例如 FileSplitterStreamTransformer,會自動關閉工作階段。請參閱 檔案分割器串流轉換器 以取得關於這些元件的更多資訊。以下範例顯示如何設定 inbound-streaming-channel-adapter

<int-ftp:inbound-streaming-channel-adapter id="ftpInbound"
            channel="ftpChannel"
            session-factory="sessionFactory"
            filename-pattern="*.txt"
            filename-regex=".*\.txt"
            filter="filter"
            filter-expression="@myFilterBean.check(#root)"
            remote-file-separator="/"
            comparator="comparator"
            max-fetch-size="1"
            remote-directory-expression="'foo/bar'">
        <int:poller fixed-rate="1000" />
</int-ftp:inbound-streaming-channel-adapter>

僅允許使用 filename-patternfilename-regexfilterfilter-expression 其中之一。

從版本 5.0 開始,預設情況下,FtpStreamingMessageSource 配接器會使用基於記憶體內 SimpleMetadataStoreFtpPersistentAcceptOnceFileListFilter 來防止遠端檔案重複。預設情況下,此篩選器也適用於檔案名稱模式(或 regex)。如果您需要允許重複,可以使用 AcceptAllFileListFilter。任何其他用例都可以由 CompositeFileListFilter(或 ChainFileListFilter)處理。Java 設定 (稍後在文件中) 展示了一種在處理後移除遠端檔案以避免重複的技術。

有關 FtpPersistentAcceptOnceFileListFilter 及其使用方式的更多資訊,請參閱 遠端持久檔案列表篩選器

當需要提取時,使用 max-fetch-size 屬性來限制每次輪詢提取的檔案數量。在叢集環境中執行時,將其設定為 1 並使用持久性篩選器。請參閱 輸入通道配接器:控制遠端檔案提取 以取得更多資訊。

配接器將遠端目錄和檔案名稱分別放在 FileHeaders.REMOTE_DIRECTORYFileHeaders.REMOTE_FILE 標頭中。從版本 5.0 開始,FileHeaders.REMOTE_FILE_INFO 標頭提供額外的遠端檔案資訊(預設以 JSON 表示)。如果您將 FtpStreamingMessageSource 上的 fileInfoJson 屬性設定為 false,則標頭包含 FtpFileInfo 物件。可以使用 FtpFileInfo.getFileInfo() 方法存取底層 Apache Net 程式庫提供的 FTPFile 物件。當您使用 XML 設定時,fileInfoJson 屬性不可用,但您可以透過將 FtpStreamingMessageSource 注入到您的設定類別之一來設定它。另請參閱 遠端檔案資訊

從版本 5.1 開始,comparator 的泛型類型為 FTPFile。先前,它是 AbstractFileInfo<FTPFile>。這是因為排序現在在處理的早期執行,在篩選和應用 maxFetch 之前。

使用 Java 設定進行設定

以下 Spring Boot 應用程式顯示如何使用 Java 設定設定輸入配接器的範例

@SpringBootApplication
public class FtpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FtpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    @InboundChannelAdapter(channel = "stream")
    public MessageSource<InputStream> ftpMessageSource() {
        FtpStreamingMessageSource messageSource = new FtpStreamingMessageSource(template());
        messageSource.setRemoteDirectory("ftpSource/");
        messageSource.setFilter(new AcceptAllFileListFilter<>());
        messageSource.setMaxFetchSize(1);
        return messageSource;
    }

    @Bean
    @Transformer(inputChannel = "stream", outputChannel = "data")
    public org.springframework.integration.transformer.Transformer transformer() {
        return new StreamTransformer("UTF-8");
    }

    @Bean
    public FtpRemoteFileTemplate template() {
        return new FtpRemoteFileTemplate(ftpSessionFactory());
    }

    @ServiceActivator(inputChannel = "data", adviceChain = "after")
    @Bean
    public MessageHandler handle() {
        return System.out::println;
    }

    @Bean
    public ExpressionEvaluatingRequestHandlerAdvice after() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setOnSuccessExpression(
                "@template.remove(headers['file_remoteDirectory'] + headers['file_remoteFile'])");
        advice.setPropagateEvaluationFailures(true);
        return advice;
    }

}

請注意,在本範例中,轉換器下游的訊息處理器具有一個 advice,可在處理後移除遠端檔案。