SFTP 串流輸入通道配接器

Version 4.3 引入了串流輸入通道配接器。此配接器產生酬載類型為 InputStream 的訊息,讓您無需寫入本機檔案系統即可擷取檔案。由於工作階段保持開啟狀態,因此取用應用程式負責在檔案被取用後關閉工作階段。工作階段在 closeableResource 標頭 (IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE) 中提供。標準框架元件,例如 FileSplitterStreamTransformer,會自動關閉工作階段。有關這些元件的更多資訊,請參閱 檔案分割器串流轉換器。以下範例展示如何設定 SFTP 串流輸入通道配接器

<int-sftp:inbound-streaming-channel-adapter id="ftpInbound"
            channel="ftpChannel"
            session-factory="sessionFactory"
            filename-pattern="*.txt"
            filename-regex=".*\.txt"
            filter="filter"
            filter-expression="@myFilterBean.check(#root)"
            remote-file-separator="/"
            comparator="comparator"
            max-fetch-size="1"
            remote-directory-expression="'foo/bar'">
        <int:poller fixed-rate="1000" />
</int-sftp:inbound-streaming-channel-adapter>

您只能使用 filename-patternfilename-regexfilterfilter-expression 其中之一。

從 5.0 版開始,預設情況下,SftpStreamingMessageSource 配接器會使用基於記憶體內 SimpleMetadataStoreSftpPersistentAcceptOnceFileListFilter 來防止遠端檔案重複。預設情況下,此篩選器也與檔案名稱模式(或 regex)一起應用。如果您需要允許重複,可以使用 AcceptAllFileListFilter。您可以透過使用 CompositeFileListFilter(或 ChainFileListFilter)來處理任何其他用例。稍後顯示的 Java 設定展示了一種在處理後移除遠端檔案的技術,以避免重複。

有關 SftpPersistentAcceptOnceFileListFilter 及其使用方式的更多資訊,請參閱 遠端持久性檔案列表篩選器

您可以使用 max-fetch-size 屬性來限制每次輪詢時擷取的檔案數量(當需要擷取時)。在叢集環境中執行時,將其設定為 1 並使用持久性篩選器。有關更多資訊,請參閱 輸入通道配接器:控制遠端檔案擷取

配接器將遠端目錄和檔案名稱放在標頭中(分別為 FileHeaders.REMOTE_DIRECTORYFileHeaders.REMOTE_FILE)。從 5.0 版開始,FileHeaders.REMOTE_FILE_INFO 標頭提供額外的遠端檔案資訊(以 JSON 格式)。如果您在 SftpStreamingMessageSource 上將 fileInfoJson 屬性設定為 false,則標頭會包含 SftpFileInfo 物件。您可以使用 SftpFileInfo.getFileInfo() 方法存取底層 SftpClient 提供的 SftpClient.DirEntry 物件。當您使用 XML 設定時,fileInfoJson 屬性不可用,但您可以透過將 SftpStreamingMessageSource 注入到您的設定類別之一來設定它。另請參閱 遠端檔案資訊

使用 Java 設定進行設定

以下 Spring Boot 應用程式展示了如何使用 Java 設定輸入配接器的範例

@SpringBootApplication
public class SftpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(SftpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    @InboundChannelAdapter(channel = "stream")
    public MessageSource<InputStream> ftpMessageSource() {
        SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(template());
        messageSource.setRemoteDirectory("sftpSource/");
        messageSource.setFilter(new AcceptAllFileListFilter<>());
        messageSource.setMaxFetchSize(1);
        return messageSource;
    }

    @Bean
    @Transformer(inputChannel = "stream", outputChannel = "data")
    public org.springframework.integration.transformer.Transformer transformer() {
        return new StreamTransformer("UTF-8");
    }

    @Bean
    public SftpRemoteFileTemplate template() {
        return new SftpRemoteFileTemplate(sftpSessionFactory());
    }

    @ServiceActivator(inputChannel = "data", adviceChain = "after")
    @Bean
    public MessageHandler handle() {
        return System.out::println;
    }

    @Bean
    public ExpressionEvaluatingRequestHandlerAdvice after() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setOnSuccessExpression(
                "@template.remove(headers['file_remoteDirectory'] + '/' +  headers['file_remoteFile'])");
        advice.setPropagateEvaluationFailures(true);
        return advice;
    }

}

請注意,在此範例中,轉換器下游的訊息處理器具有一個 advice,可在處理後移除遠端檔案。