SFTP 接收通道适配器

SFTP 接收通道适配器是一個特殊的監聽器,它連接到伺服器並監聽遠端目錄事件(例如建立新檔案),此時它會啟動檔案傳輸。以下範例示範如何配置 SFTP 接收通道适配器

<int-sftp:inbound-channel-adapter id="sftpAdapterAutoCreate"
              session-factory="sftpSessionFactory"
            channel="requestChannel"
            filename-pattern="*.txt"
            remote-directory="/foo/bar"
            preserve-timestamp="true"
            local-directory="file:target/foo"
            auto-create-local-directory="true"
            local-filename-generator-expression="#this.toUpperCase() + '.a'"
            scanner="myDirScanner"
            local-filter="myFilter"
            temporary-file-suffix=".writing"
            max-fetch-size="-1"
            delete-remote-files="false">
        <int:poller fixed-rate="1000"/>
</int-sftp:inbound-channel-adapter>

先前的配置範例示範如何為各種屬性提供值,包括以下各項

  • local-directory:檔案將傳輸到的位置

  • remote-directory:將從中傳輸檔案的遠端來源目錄

  • session-factory:對我們先前配置的 bean 的參考

預設情況下,傳輸的檔案與原始檔案同名。如果您想要覆寫此行為,您可以設定 local-filename-generator-expression 屬性,讓您提供 SpEL 表达式來產生本機檔案的名稱。與輸出閘道和适配器不同,在輸出閘道和适配器中,SpEL 評估內容的根物件是 Message,此接收适配器在評估時還沒有訊息,因為這是它最終使用傳輸的檔案作為其 payload 所產生的內容。因此,SpEL 評估內容的根物件是遠端檔案的原始名稱 (String)。

接收通道适配器首先將檔案擷取到本機目錄,然後根據 poller 配置發出每個檔案。從 5.0 版開始,您可以限制從 SFTP 伺服器擷取的檔案數量,以便在需要擷取新檔案時使用。當目標檔案很大或在具有持久檔案列表篩選器的叢集系統中執行時,這可能很有用,稍後將在本節中討論。使用 max-fetch-size 來達到此目的。負值(預設值)表示沒有限制,並且擷取所有符合條件的檔案。如需更多資訊,請參閱 接收通道适配器:控制遠端檔案擷取。從 5.0 版開始,您也可以透過設定 scanner 屬性,為 inbound-channel-adapter 提供自訂 DirectoryScanner 實作。

從 Spring Integration 3.0 開始,您可以指定 preserve-timestamp 屬性(預設值為 false)。當為 true 時,本機檔案的修改時間戳記會設定為從伺服器擷取的值。否則,它會設定為目前時間。

從 4.2 版開始,您可以指定 remote-directory-expression 而不是 remote-directory,這可讓您在每次輪詢時動態判斷目錄 — 例如,remote-directory-expression="@myBean.determineRemoteDir()"

有時,基於透過 filename-pattern 屬性指定的簡單模式的檔案篩選可能不足。如果屬於這種情況,您可以使用 filename-regex 屬性來指定正則表達式(例如,filename-regex=".*\.test$")。如果您需要完全控制,您可以使用 filter 屬性來提供對 org.springframework.integration.file.filters.FileListFilter 自訂實作的參考,這是一個用於篩選檔案列表的策略介面。此篩選器決定要擷取哪些遠端檔案。您也可以將基於模式的篩選器與其他篩選器(例如 AcceptOnceFileListFilter,以避免同步先前已擷取的檔案)結合使用,方法是使用 CompositeFileListFilter

AcceptOnceFileListFilter 將其狀態儲存在記憶體中。如果您希望狀態在系統重新啟動後仍然存在,請考慮改用 SftpPersistentAcceptOnceFileListFilter。此篩選器將接受的檔案名稱儲存在 MetadataStore 策略的實例中(請參閱 元數據儲存區)。此篩選器會比對檔案名稱和遠端修改時間。

從 4.0 版開始,此篩選器需要 ConcurrentMetadataStore。當與共用資料儲存區(例如使用 RedisMetadataStoreRedis)搭配使用時,這可讓篩選器金鑰在多個應用程式或伺服器實例之間共用。

從 5.0 版開始,SftpPersistentAcceptOnceFileListFilter 與記憶體內 SimpleMetadataStore 預設會應用於 SftpInboundFileSynchronizer。此篩選器也會與 XML 配置中的 regexpattern 選項以及 Java DSL 中的 SftpInboundChannelAdapterSpec 一起套用。您可以使用 CompositeFileListFilter (或 ChainFileListFilter) 來處理任何其他使用案例。

以上討論是指在擷取檔案之前篩選檔案。擷取檔案後,會將額外的篩選器套用至檔案系統上的檔案。預設情況下,這是 `AcceptOnceFileListFilter`,如本節所述,它會在記憶體中保留狀態,並且不會考慮檔案的修改時間。除非您的應用程式在處理後移除檔案,否則在應用程式重新啟動後,适配器預設會重新處理磁碟上的檔案。

此外,如果您配置 filter 以使用 SftpPersistentAcceptOnceFileListFilter,並且遠端檔案時間戳記變更(導致重新擷取),預設本機篩選器不允許處理此新檔案。

如需有關此篩選器及其使用方式的更多資訊,請參閱 遠端持久檔案列表篩選器

您可以使用 local-filter 屬性來配置本機檔案系統篩選器的行為。從 4.3.8 版開始,預設會配置 FileSystemPersistentAcceptOnceFileListFilter。此篩選器將接受的檔案名稱和修改時間戳記儲存在 MetadataStore 策略的實例中(請參閱 元數據儲存區),並偵測本機檔案修改時間的變更。預設 MetadataStoreSimpleMetadataStore,它會在記憶體中儲存狀態。

從 4.1.5 版開始,這些篩選器有一個名為 flushOnUpdate 的新屬性,這會導致它們在每次更新時刷新元數據儲存區(如果儲存區實作 Flushable)。

此外,如果您使用分散式 MetadataStore(例如 Redis 元數據儲存區),您可以擁有相同适配器或應用程式的多個實例,並確保只有一個實例處理檔案。

實際的本機篩選器是 CompositeFileListFilter,其中包含提供的篩選器和模式篩選器,可防止處理正在下載的檔案(基於 temporary-file-suffix)。檔案會使用此字尾下載(預設值為 .writing),並且檔案會在傳輸完成時重新命名為其最終名稱,使其對篩選器「可見」。

如需有關這些屬性的更多詳細資訊,請參閱 schema

SFTP 接收通道适配器是一個輪詢消費者。因此,您必須配置 poller(全域預設值或本機元素)。一旦檔案傳輸到本機目錄,就會產生以 java.io.File 作為其 payload 類型的訊息,並傳送到由 channel 屬性識別的通道。

從 6.2 版開始,您可以使用 SftpLastModifiedFileListFilter 根據上次修改策略篩選 SFTP 檔案。可以為此篩選器配置 age 屬性,以便只有早於此值的檔案才會通過篩選器。age 預設為 60 秒,但您應該選擇一個足夠大的 age,以避免過早地選取檔案(例如,由於網路故障)。請查看其 Javadoc 以取得更多資訊。

關於檔案篩選和大型檔案的更多資訊

有時,剛出現在監控(遠端)目錄中的檔案不完整。通常,此類檔案會使用某些臨時副檔名寫入(例如,名為 something.txt.writing 的檔案上的 .writing),然後在寫入過程完成後重新命名。在大多數情況下,開發人員只對完整的檔案感興趣,並且希望僅篩選這些檔案。為了處理這些情況,您可以使用 filename-patternfilename-regexfilter 屬性提供的篩選支援。如果您需要自訂篩選器實作,您可以透過設定 filter 屬性在您的适配器中包含參考。以下範例示範如何執行此操作

<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
            channel="receiveChannel"
            session-factory="sftpSessionFactory"
            filter="customFilter"
            local-directory="file:/local-test-dir"
            remote-directory="/remote-test-dir">
        <int:poller fixed-rate="1000" max-messages-per-poll="10" task-executor="executor"/>
</int-sftp:inbound-channel-adapter>

<bean id="customFilter" class="org.foo.CustomFilter"/>

從失敗中復原

您應該了解适配器的架構。檔案同步器擷取檔案,而 FileReadingMessageSource 會為每個同步的檔案發出訊息。如 先前討論,涉及兩個篩選器。filter 屬性(和模式)指的是遠端 (SFTP) 檔案列表,以避免擷取已擷取的檔案。FileReadingMessageSource 使用 local-filter 來判斷要作為訊息傳送的檔案。

同步器列出遠端檔案並諮詢其篩選器。然後傳輸檔案。如果在檔案傳輸期間發生 IO 錯誤,則會移除已新增至篩選器的任何檔案,以便它們可以在下次輪詢時重新擷取。這僅適用於篩選器實作 ReversibleFileListFilter(例如 AcceptOnceFileListFilter)的情況。

如果在同步檔案之後,下游流程處理檔案時發生錯誤,則不會發生篩選器的自動回滾,因此預設不會重新處理失敗的檔案。

如果您希望在失敗後重新處理此類檔案,您可以使用類似於以下的配置來促進從篩選器中移除失敗的檔案

<int-sftp:inbound-channel-adapter id="sftpAdapter"
        session-factory="sftpSessionFactory"
        channel="requestChannel"
        remote-directory-expression="'/sftpSource'"
        local-directory="file:myLocalDir"
        auto-create-local-directory="true"
        filename-pattern="*.txt">
    <int:poller fixed-rate="1000">
        <int:transactional synchronization-factory="syncFactory" />
    </int:poller>
</int-sftp:inbound-channel-adapter>

<bean id="acceptOnceFilter"
    class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-rollback expression="payload.delete()" />
</int:transaction-synchronization-factory>

<bean id="transactionManager"
    class="org.springframework.integration.transaction.PseudoTransactionManager" />

先前的配置適用於任何 ResettableFileListFilter

從 5.0 版開始,接收通道适配器可以在本機建立子目錄,根據產生的本機檔案名稱。這也可以是遠端子路徑。為了能夠以遞迴方式讀取本機目錄以根據階層支援進行修改,您現在可以使用新的 RecursiveDirectoryScanner(基於 Files.walk() 演算法)來提供內部 FileReadingMessageSource。如需更多資訊,請參閱 AbstractInboundFileSynchronizingMessageSource.setScanner()。此外,您現在可以使用 setUseWatchService() 選項將 AbstractInboundFileSynchronizingMessageSource 切換到基於 WatchServiceDirectoryScanner。它也針對所有 WatchEventType 實例進行配置,以對本機目錄中的任何修改做出反應。先前顯示的重新處理範例是基於 FileReadingMessageSource.WatchServiceDirectoryScanner 的內建功能,當從本機目錄中刪除檔案時(StandardWatchEventKinds.ENTRY_DELETE),它會使用 ResettableFileListFilter.remove()。如需更多資訊,請參閱 WatchServiceDirectoryScanner

使用 Java 配置進行配置

以下 Spring Boot 應用程式示範如何使用 Java 配置接收适配器

@SpringBootApplication
public class SftpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(SftpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public SessionFactory<SftpClient.DirEntry> sftpSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost("localhost");
        factory.setPort(port);
        factory.setUser("foo");
        factory.setPassword("foo");
        factory.setAllowUnknownKeys(true);
        factory.setTestSession(true);
        return new CachingSessionFactory<>(factory);
    }

    @Bean
    public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
        SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
        fileSynchronizer.setDeleteRemoteFiles(false);
        fileSynchronizer.setRemoteDirectory("foo");
        fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.xml"));
        return fileSynchronizer;
    }

    @Bean
    @InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "5000"))
    public MessageSource<File> sftpMessageSource() {
        SftpInboundFileSynchronizingMessageSource source =
                new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
        source.setLocalDirectory(new File("sftp-inbound"));
        source.setAutoCreateLocalDirectory(true);
        source.setLocalFilter(new AcceptOnceFileListFilter<File>());
        source.setMaxFetchSize(1);
        return source;
    }

    @Bean
    @ServiceActivator(inputChannel = "sftpChannel")
    public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println(message.getPayload());
            }

        };
    }

}

使用 Java DSL 進行配置

以下 Spring Boot 應用程式示範如何使用 Java DSL 配置接收适配器

@SpringBootApplication
public class SftpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(SftpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow sftpInboundFlow() {
        return IntegrationFlow
            .from(Sftp.inboundAdapter(this.sftpSessionFactory)
                    .preserveTimestamp(true)
                    .remoteDirectory("foo")
                    .regexFilter(".*\\.txt$")
                    .localFilenameExpression("#this.toUpperCase() + '.a'")
                    .localDirectory(new File("sftp-inbound")),
                 e -> e.id("sftpInboundAdapter")
                    .autoStartup(true)
                    .poller(Pollers.fixedDelay(5000)))
            .handle(m -> System.out.println(m.getPayload()))
            .get();
    }
}

處理不完整資料

請參閱 處理不完整資料

提供 SftpSystemMarkerFilePresentFileListFilter 來篩選遠端系統上沒有對應標記檔案的遠端檔案。如需配置資訊,請參閱 Javadoc