FTP 接收通道配接器

FTP 接收通道配接器是一種特殊的監聽器,它連接到 FTP 伺服器並監聽遠端目錄事件(例如,新檔案建立),在這種情況下,它會啟動檔案傳輸。以下範例顯示如何組態 inbound-channel-adapter

<int-ftp:inbound-channel-adapter id="ftpInbound"
    channel="ftpChannel"
    session-factory="ftpSessionFactory"
    auto-create-local-directory="true"
    delete-remote-files="true"
    filename-pattern="*.txt"
    remote-directory="some/remote/path"
    remote-file-separator="/"
    preserve-timestamp="true"
    local-filename-generator-expression="#this.toUpperCase() + '.a'"
    scanner="myDirScanner"
    local-filter="myFilter"
    temporary-file-suffix=".writing"
    max-fetch-size="-1"
    local-directory=".">
    <int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter>

如前面的組態所示,您可以使用 inbound-channel-adapter 元素來組態 FTP 接收通道配接器,同時也為各種屬性提供值,例如 local-directoryfilename-pattern(它基於簡單的模式匹配,而不是正則表達式)以及 session-factory 的參考。

預設情況下,傳輸的檔案與原始檔案同名。如果您想要覆寫此行為,您可以設定 local-filename-generator-expression 屬性,讓您可以提供 SpEL 運算式來產生本機檔案的名稱。與傳送閘道和配接器不同,在這些閘道和配接器中,SpEL 評估內容的根物件是 Message,此接收配接器在評估時尚未擁有消息,因為這就是它最終使用傳輸的檔案作為其負載所產生的內容。因此,SpEL 評估內容的根物件是遠端檔案的原始名稱 (String)。

接收通道配接器首先擷取本機目錄的 File 物件,然後根據輪詢器組態發出每個檔案。從 5.0 版開始,您現在可以限制從 FTP 伺服器擷取的檔案數量(當需要新的檔案擷取時)。當目標檔案非常大,或者當您在叢集系統中執行具有持久性檔案清單過濾器(稍後討論)時,這可能很有利。使用 max-fetch-size 來達到此目的。負值(預設值)表示沒有限制,並且會擷取所有相符的檔案。如需更多資訊,請參閱 接收通道配接器:控制遠端檔案擷取。從 5.0 版開始,您也可以透過設定 scanner 屬性,為 inbound-channel-adapter 提供自訂 DirectoryScanner 實作。

從 Spring Integration 3.0 開始,您可以指定 preserve-timestamp 屬性(其預設值為 false)。當為 true 時,本機檔案的修改時間戳記會設定為從伺服器擷取的值。否則,它會設定為目前時間。

從 4.2 版開始,您可以指定 remote-directory-expression 而不是 remote-directory,讓您可以動態決定每次輪詢的目錄,例如 remote-directory-expression="@myBean.determineRemoteDir()"

從 4.3 版開始,您可以省略 remote-directoryremote-directory-expression 屬性。它們預設為 null。在這種情況下,根據 FTP 協定,用戶端工作目錄會用作預設遠端目錄。

有時,基於 filename-pattern 屬性指定的簡單模式的檔案過濾可能不足以滿足需求。如果是這種情況,您可以使用 filename-regex 屬性來指定正則表達式(例如 filename-regex=".*\.test$")。此外,如果您需要完全控制,您可以使用 filter 屬性,並提供對 o.s.i.file.filters.FileListFilter 任何自訂實作的參考,這是一個用於過濾檔案清單的策略介面。此過濾器決定要擷取哪些遠端檔案。您也可以透過使用 CompositeFileListFilter,將基於模式的過濾器與其他過濾器(例如 AcceptOnceFileListFilter,以避免同步先前已擷取的檔案)結合使用。

AcceptOnceFileListFilter 將其狀態儲存在記憶體中。如果您希望狀態在系統重新啟動後仍然存在,請考慮改用 FtpPersistentAcceptOnceFileListFilter。此過濾器將接受的檔案名稱儲存在 MetadataStore 策略的執行個體中(請參閱 元數據儲存庫)。此過濾器會比對檔案名稱和遠端修改時間。

從 4.0 版開始,此過濾器需要 ConcurrentMetadataStore。當與共用資料儲存庫(例如具有 RedisMetadataStoreRedis)一起使用時,它允許跨多個應用程式或伺服器執行個體共用過濾器金鑰。

從 5.0 版開始,FtpPersistentAcceptOnceFileListFilter 與記憶體內 SimpleMetadataStore 預設會應用於 FtpInboundFileSynchronizer。此過濾器也會與 XML 組態中的 regexpattern 選項以及 Java DSL 中的 FtpInboundChannelAdapterSpec 一起應用。任何其他使用案例都可以使用 CompositeFileListFilter(或 ChainFileListFilter)進行管理。

前面的討論是指在擷取檔案之前過濾檔案。擷取檔案後,會將額外的過濾器套用至檔案系統上的檔案。預設情況下,這是一個 AcceptOnceFileListFilter,如先前討論的,它會將狀態保留在記憶體中,並且不會考慮檔案的修改時間。除非您的應用程式在處理後移除檔案,否則在應用程式重新啟動後,配接器預設會重新處理磁碟上的檔案。

此外,如果您組態 filter 以使用 FtpPersistentAcceptOnceFileListFilter,並且遠端檔案時間戳記變更(導致重新擷取),預設的本機過濾器不會讓此新檔案被處理。

如需關於此過濾器及其使用方式的更多資訊,請參閱 遠端持久性檔案清單過濾器

您可以使用 local-filter 屬性來組態本機檔案系統過濾器的行為。從 4.3.8 版開始,預設會組態 FileSystemPersistentAcceptOnceFileListFilter。此過濾器將接受的檔案名稱和修改時間戳記儲存在 MetadataStore 策略的執行個體中(請參閱 元數據儲存庫),並偵測本機檔案修改時間的變更。預設的 MetadataStoreSimpleMetadataStore,它將狀態儲存在記憶體中。

從 4.1.5 版開始,這些過濾器有一個新的屬性 (flushOnUpdate),這會導致它們在每次更新時刷新元數據儲存庫(如果儲存庫實作 Flushable)。

此外,如果您使用分散式 MetadataStore(例如 Redis),您可以擁有相同配接器或應用程式的多個執行個體,並確保每個檔案僅處理一次。

實際的本機過濾器是一個 CompositeFileListFilter,它包含提供的過濾器和一個模式過濾器,該模式過濾器可防止處理正在下載的檔案(基於 temporary-file-suffix)。檔案會使用此後綴(預設值為 .writing)下載,並且當傳輸完成時,檔案會重新命名為其最終名稱,使其對過濾器「可見」。

如果預設的 '/' 不適用於您的特定環境,則 remote-file-separator 屬性可讓您組態要使用的檔案分隔符字元。

如需關於這些屬性的更多詳細資訊,請參閱 schema

您也應該瞭解 FTP 接收通道配接器是一個輪詢消費者。因此,您必須組態輪詢器(透過使用全域預設值或本機子元素)。檔案傳輸後,會產生一個以 java.io.File 作為其負載的消息,並傳送到由 channel 屬性識別的通道。

從 6.2 版開始,您可以使用 FtpLastModifiedFileListFilter,根據上次修改策略來過濾 FTP 檔案。可以使用 age 屬性組態此過濾器,以便只有早於此值的檔案才會通過過濾器。age 預設為 60 秒,但您應該選擇一個足夠大的 age,以避免過早地選取檔案(例如,由於網路故障)。請查看其 Javadoc 以取得更多資訊。

關於檔案過濾和不完整檔案的更多資訊

有時,剛出現在監控(遠端)目錄中的檔案並不完整。通常,此類檔案會使用臨時副檔名(例如 somefile.txt.writing)寫入,然後在寫入程序完成後重新命名。在大多數情況下,您只對完整的檔案感興趣,並且想要僅針對完整的檔案進行過濾。若要處理這些情況,您可以使用 filename-patternfilename-regexfilter 屬性提供的過濾支援。以下範例使用自訂過濾器實作

<int-ftp:inbound-channel-adapter
    channel="ftpChannel"
    session-factory="ftpSessionFactory"
    filter="customFilter"
    local-directory="file:/my_transfers">
    remote-directory="some/remote/path"
    <int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter>

<bean id="customFilter" class="org.example.CustomFilter"/>

接收 FTP 配接器的輪詢器組態注意事項

接收 FTP 配接器的工作包含兩個任務

  1. 與遠端伺服器通訊,以便將檔案從遠端目錄傳輸到本機目錄。

  2. 對於每個傳輸的檔案,產生一個以該檔案作為負載的消息,並將其傳送到由 'channel' 屬性識別的通道。這就是為什麼它們被稱為「通道配接器」而不是僅僅「配接器」。此類配接器的主要工作是產生要傳送到消息通道的消息。從本質上講,第二個任務具有優先權,因此,如果您的本機目錄已經有一個或多個檔案,它會首先從這些檔案產生消息。只有當所有本機檔案都已處理完畢後,它才會啟動遠端通訊以擷取更多檔案。

此外,當在輪詢器上組態觸發器時,您應該密切注意 max-messages-per-poll 屬性。對於所有 SourcePollingChannelAdapter 執行個體(包括 FTP),其預設值為 1。這表示,一旦處理完一個檔案,它就會等待下一個執行時間(由您的觸發器組態決定)。如果您碰巧在本機目錄中有一個或多個檔案,它會在啟動與遠端 FTP 伺服器的通訊之前處理這些檔案。此外,如果 max-messages-per-poll 設定為 1(預設值),它每次只處理一個檔案,間隔由您的觸發器定義,本質上是作為「一次輪詢 === 一個檔案」運作。

對於典型的檔案傳輸使用案例,您很可能想要相反的行為:針對每次輪詢處理您可以處理的所有檔案,然後才等待下一次輪詢。如果是這種情況,請將 max-messages-per-poll 設定為 -1。然後,在每次輪詢時,配接器會嘗試產生盡可能多的消息。換句話說,它會處理本機目錄中的所有內容,然後連接到遠端目錄以傳輸所有可在該處處理的內容。只有在這樣做之後,輪詢作業才被視為完成,並且輪詢器會等待下一個執行時間。

或者,您可以將 'max-messages-per-poll' 值設定為正值,表示每次輪詢從檔案建立的消息的上限。例如,值為 10 表示,在每次輪詢時,它會嘗試處理不超過十個檔案。

從失敗中恢復

瞭解配接器的架構非常重要。有一個檔案同步器擷取檔案,以及一個 FileReadingMessageSource,它為每個同步的檔案發出一個消息。如先前討論的,涉及兩個過濾器。filter 屬性(和模式)是指遠端 (FTP) 檔案清單,以避免擷取已擷取的檔案。local-filterFileReadingMessageSource 用於決定哪些檔案要作為消息傳送。

同步器列出遠端檔案並諮詢其過濾器。然後傳輸檔案。如果在檔案傳輸期間發生 IO 錯誤,則會移除已新增至過濾器的任何檔案,以便它們有資格在下一次輪詢時重新擷取。這僅適用於過濾器實作 ReversibleFileListFilter(例如 AcceptOnceFileListFilter)的情況。

如果在同步檔案後,下游流程處理檔案時發生錯誤,則不會發生過濾器的自動回滾,因此預設不會重新處理失敗的檔案。

如果您希望在失敗後重新處理此類檔案,您可以使用類似於以下的組態來協助從過濾器中移除失敗的檔案

<int-ftp:inbound-channel-adapter id="ftpAdapter"
        session-factory="ftpSessionFactory"
        channel="requestChannel"
        remote-directory-expression="'/ftpSource'"
        local-directory="file:myLocalDir"
        auto-create-local-directory="true"
        filename-pattern="*.txt">
    <int:poller fixed-rate="1000">
        <int:transactional synchronization-factory="syncFactory" />
    </int:poller>
</int-ftp:inbound-channel-adapter>

<bean id="acceptOnceFilter"
    class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-rollback expression="payload.delete()" />
</int:transaction-synchronization-factory>

<bean id="transactionManager"
    class="org.springframework.integration.transaction.PseudoTransactionManager" />

上述組態適用於任何 ResettableFileListFilter

從 5.0 版開始,接收通道配接器可以在本機建立與產生的本機檔案名稱對應的子目錄。那也可能是遠端子路徑。為了能夠以遞迴方式讀取本機目錄以進行修改(根據階層支援),您現在可以為內部 FileReadingMessageSource 提供基於 Files.walk() 演算法的新 RecursiveDirectoryScanner。如需更多資訊,請參閱 AbstractInboundFileSynchronizingMessageSource.setScanner()。此外,您現在可以透過使用 setUseWatchService() 選項,將 AbstractInboundFileSynchronizingMessageSource 切換到基於 WatchServiceDirectoryScanner。它也針對所有 WatchEventType 執行個體進行組態,以對本機目錄中的任何修改做出反應。先前顯示的重新處理範例是基於 FileReadingMessageSource.WatchServiceDirectoryScanner 的內建功能,以便在從本機目錄中刪除檔案 (StandardWatchEventKinds.ENTRY_DELETE) 時執行 ResettableFileListFilter.remove()。如需更多資訊,請參閱 WatchServiceDirectoryScanner

使用 Java 組態進行組態

以下 Spring Boot 應用程式顯示如何使用 Java 組態組態接收配接器的範例

@SpringBootApplication
public class FtpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FtpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public SessionFactory<FTPFile> ftpSessionFactory() {
        DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
        sf.setHost("localhost");
        sf.setPort(port);
        sf.setUsername("foo");
        sf.setPassword("foo");
        sf.setTestSession(true);
        return new CachingSessionFactory<FTPFile>(sf);
    }

    @Bean
    public FtpInboundFileSynchronizer ftpInboundFileSynchronizer() {
        FtpInboundFileSynchronizer fileSynchronizer = new FtpInboundFileSynchronizer(ftpSessionFactory());
        fileSynchronizer.setDeleteRemoteFiles(false);
        fileSynchronizer.setRemoteDirectory("foo");
        fileSynchronizer.setFilter(new FtpSimplePatternFileListFilter("*.xml"));
        return fileSynchronizer;
    }

    @Bean
    @InboundChannelAdapter(channel = "ftpChannel", poller = @Poller(fixedDelay = "5000"))
    public MessageSource<File> ftpMessageSource() {
        FtpInboundFileSynchronizingMessageSource source =
                new FtpInboundFileSynchronizingMessageSource(ftpInboundFileSynchronizer());
        source.setLocalDirectory(new File("ftp-inbound"));
        source.setAutoCreateLocalDirectory(true);
        source.setLocalFilter(new AcceptOnceFileListFilter<File>());
        source.setMaxFetchSize(1);
        return source;
    }

    @Bean
    @ServiceActivator(inputChannel = "ftpChannel")
    public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println(message.getPayload());
            }

        };
    }

}

使用 Java DSL 進行組態

以下 Spring Boot 應用程式顯示如何使用 Java DSL 組態接收配接器的範例

@SpringBootApplication
public class FtpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FtpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow ftpInboundFlow() {
        return IntegrationFlow
            .from(Ftp.inboundAdapter(this.ftpSessionFactory)
                    .preserveTimestamp(true)
                    .remoteDirectory("foo")
                    .regexFilter(".*\\.txt$")
                    .localFilename(f -> f.toUpperCase() + ".a")
                    .localDirectory(new File("d:\\ftp_files")),
                e -> e.id("ftpInboundAdapter")
                    .autoStartup(true)
                    .poller(Pollers.fixedDelay(5000)))
            .handle(m -> System.out.println(m.getPayload()))
            .get();
    }
}

處理不完整資料

請參閱 處理不完整資料

提供 FtpSystemMarkerFilePresentFileListFilter 是為了過濾遠端系統上沒有對應標記檔案的遠端檔案。如需組態資訊,請參閱 Javadoc(並瀏覽至父類別)。