FTP 串流輸入通道配接器
版本 4.3 引入了串流輸入通道配接器。此配接器產生帶有 InputStream
類型有效負載的訊息,允許在不寫入本機檔案系統的情況下提取檔案。由於工作階段保持開啟狀態,因此使用應用程式負責在檔案被使用後關閉工作階段。工作階段在 closeableResource
標頭 (IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE
) 中提供。標準框架元件,例如 FileSplitter
和 StreamTransformer
,會自動關閉工作階段。請參閱 檔案分割器 和 串流轉換器 以取得關於這些元件的更多資訊。以下範例顯示如何設定 inbound-streaming-channel-adapter
<int-ftp:inbound-streaming-channel-adapter id="ftpInbound"
channel="ftpChannel"
session-factory="sessionFactory"
filename-pattern="*.txt"
filename-regex=".*\.txt"
filter="filter"
filter-expression="@myFilterBean.check(#root)"
remote-file-separator="/"
comparator="comparator"
max-fetch-size="1"
remote-directory-expression="'foo/bar'">
<int:poller fixed-rate="1000" />
</int-ftp:inbound-streaming-channel-adapter>
僅允許使用 filename-pattern
、filename-regex
、filter
或 filter-expression
其中之一。
從版本 5.0 開始,預設情況下,FtpStreamingMessageSource 配接器會使用基於記憶體內 SimpleMetadataStore 的 FtpPersistentAcceptOnceFileListFilter 來防止遠端檔案重複。預設情況下,此篩選器也適用於檔案名稱模式(或 regex)。如果您需要允許重複,可以使用 AcceptAllFileListFilter 。任何其他用例都可以由 CompositeFileListFilter (或 ChainFileListFilter )處理。Java 設定 (稍後在文件中) 展示了一種在處理後移除遠端檔案以避免重複的技術。 |
有關 FtpPersistentAcceptOnceFileListFilter
及其使用方式的更多資訊,請參閱 遠端持久檔案列表篩選器。
當需要提取時,使用 max-fetch-size
屬性來限制每次輪詢提取的檔案數量。在叢集環境中執行時,將其設定為 1
並使用持久性篩選器。請參閱 輸入通道配接器:控制遠端檔案提取 以取得更多資訊。
配接器將遠端目錄和檔案名稱分別放在 FileHeaders.REMOTE_DIRECTORY
和 FileHeaders.REMOTE_FILE
標頭中。從版本 5.0 開始,FileHeaders.REMOTE_FILE_INFO
標頭提供額外的遠端檔案資訊(預設以 JSON 表示)。如果您將 FtpStreamingMessageSource
上的 fileInfoJson
屬性設定為 false
,則標頭包含 FtpFileInfo
物件。可以使用 FtpFileInfo.getFileInfo()
方法存取底層 Apache Net 程式庫提供的 FTPFile
物件。當您使用 XML 設定時,fileInfoJson
屬性不可用,但您可以透過將 FtpStreamingMessageSource
注入到您的設定類別之一來設定它。另請參閱 遠端檔案資訊。
從版本 5.1 開始,comparator
的泛型類型為 FTPFile
。先前,它是 AbstractFileInfo<FTPFile>
。這是因為排序現在在處理的早期執行,在篩選和應用 maxFetch
之前。
使用 Java 設定進行設定
以下 Spring Boot 應用程式顯示如何使用 Java 設定設定輸入配接器的範例
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
@InboundChannelAdapter(channel = "stream")
public MessageSource<InputStream> ftpMessageSource() {
FtpStreamingMessageSource messageSource = new FtpStreamingMessageSource(template());
messageSource.setRemoteDirectory("ftpSource/");
messageSource.setFilter(new AcceptAllFileListFilter<>());
messageSource.setMaxFetchSize(1);
return messageSource;
}
@Bean
@Transformer(inputChannel = "stream", outputChannel = "data")
public org.springframework.integration.transformer.Transformer transformer() {
return new StreamTransformer("UTF-8");
}
@Bean
public FtpRemoteFileTemplate template() {
return new FtpRemoteFileTemplate(ftpSessionFactory());
}
@ServiceActivator(inputChannel = "data", adviceChain = "after")
@Bean
public MessageHandler handle() {
return System.out::println;
}
@Bean
public ExpressionEvaluatingRequestHandlerAdvice after() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setOnSuccessExpression(
"@template.remove(headers['file_remoteDirectory'] + headers['file_remoteFile'])");
advice.setPropagateEvaluationFailures(true);
return advice;
}
}
請注意,在本範例中,轉換器下游的訊息處理器具有一個 advice
,可在處理後移除遠端檔案。