SFTP 接收通道适配器
SFTP 接收通道适配器是一個特殊的監聽器,它連接到伺服器並監聽遠端目錄事件(例如建立新檔案),此時它會啟動檔案傳輸。以下範例示範如何配置 SFTP 接收通道适配器
<int-sftp:inbound-channel-adapter id="sftpAdapterAutoCreate"
session-factory="sftpSessionFactory"
channel="requestChannel"
filename-pattern="*.txt"
remote-directory="/foo/bar"
preserve-timestamp="true"
local-directory="file:target/foo"
auto-create-local-directory="true"
local-filename-generator-expression="#this.toUpperCase() + '.a'"
scanner="myDirScanner"
local-filter="myFilter"
temporary-file-suffix=".writing"
max-fetch-size="-1"
delete-remote-files="false">
<int:poller fixed-rate="1000"/>
</int-sftp:inbound-channel-adapter>
先前的配置範例示範如何為各種屬性提供值,包括以下各項
-
local-directory
:檔案將傳輸到的位置 -
remote-directory
:將從中傳輸檔案的遠端來源目錄 -
session-factory
:對我們先前配置的 bean 的參考
預設情況下,傳輸的檔案與原始檔案同名。如果您想要覆寫此行為,您可以設定 local-filename-generator-expression
屬性,讓您提供 SpEL 表达式來產生本機檔案的名稱。與輸出閘道和适配器不同,在輸出閘道和适配器中,SpEL 評估內容的根物件是 Message
,此接收适配器在評估時還沒有訊息,因為這是它最終使用傳輸的檔案作為其 payload 所產生的內容。因此,SpEL 評估內容的根物件是遠端檔案的原始名稱 (String
)。
接收通道适配器首先將檔案擷取到本機目錄,然後根據 poller 配置發出每個檔案。從 5.0 版開始,您可以限制從 SFTP 伺服器擷取的檔案數量,以便在需要擷取新檔案時使用。當目標檔案很大或在具有持久檔案列表篩選器的叢集系統中執行時,這可能很有用,稍後將在本節中討論。使用 max-fetch-size
來達到此目的。負值(預設值)表示沒有限制,並且擷取所有符合條件的檔案。如需更多資訊,請參閱 接收通道适配器:控制遠端檔案擷取。從 5.0 版開始,您也可以透過設定 scanner
屬性,為 inbound-channel-adapter
提供自訂 DirectoryScanner
實作。
從 Spring Integration 3.0 開始,您可以指定 preserve-timestamp
屬性(預設值為 false
)。當為 true
時,本機檔案的修改時間戳記會設定為從伺服器擷取的值。否則,它會設定為目前時間。
從 4.2 版開始,您可以指定 remote-directory-expression
而不是 remote-directory
,這可讓您在每次輪詢時動態判斷目錄 — 例如,remote-directory-expression="@myBean.determineRemoteDir()"
。
有時,基於透過 filename-pattern
屬性指定的簡單模式的檔案篩選可能不足。如果屬於這種情況,您可以使用 filename-regex
屬性來指定正則表達式(例如,filename-regex=".*\.test$"
)。如果您需要完全控制,您可以使用 filter
屬性來提供對 org.springframework.integration.file.filters.FileListFilter
自訂實作的參考,這是一個用於篩選檔案列表的策略介面。此篩選器決定要擷取哪些遠端檔案。您也可以將基於模式的篩選器與其他篩選器(例如 AcceptOnceFileListFilter
,以避免同步先前已擷取的檔案)結合使用,方法是使用 CompositeFileListFilter
。
AcceptOnceFileListFilter
將其狀態儲存在記憶體中。如果您希望狀態在系統重新啟動後仍然存在,請考慮改用 SftpPersistentAcceptOnceFileListFilter
。此篩選器將接受的檔案名稱儲存在 MetadataStore
策略的實例中(請參閱 元數據儲存區)。此篩選器會比對檔案名稱和遠端修改時間。
從 4.0 版開始,此篩選器需要 ConcurrentMetadataStore
。當與共用資料儲存區(例如使用 RedisMetadataStore
的 Redis
)搭配使用時,這可讓篩選器金鑰在多個應用程式或伺服器實例之間共用。
從 5.0 版開始,SftpPersistentAcceptOnceFileListFilter
與記憶體內 SimpleMetadataStore
預設會應用於 SftpInboundFileSynchronizer
。此篩選器也會與 XML 配置中的 regex
或 pattern
選項以及 Java DSL 中的 SftpInboundChannelAdapterSpec
一起套用。您可以使用 CompositeFileListFilter
(或 ChainFileListFilter
) 來處理任何其他使用案例。
以上討論是指在擷取檔案之前篩選檔案。擷取檔案後,會將額外的篩選器套用至檔案系統上的檔案。預設情況下,這是 `AcceptOnceFileListFilter`,如本節所述,它會在記憶體中保留狀態,並且不會考慮檔案的修改時間。除非您的應用程式在處理後移除檔案,否則在應用程式重新啟動後,适配器預設會重新處理磁碟上的檔案。
此外,如果您配置 filter
以使用 SftpPersistentAcceptOnceFileListFilter
,並且遠端檔案時間戳記變更(導致重新擷取),預設本機篩選器不允許處理此新檔案。
如需有關此篩選器及其使用方式的更多資訊,請參閱 遠端持久檔案列表篩選器。
您可以使用 local-filter
屬性來配置本機檔案系統篩選器的行為。從 4.3.8 版開始,預設會配置 FileSystemPersistentAcceptOnceFileListFilter
。此篩選器將接受的檔案名稱和修改時間戳記儲存在 MetadataStore
策略的實例中(請參閱 元數據儲存區),並偵測本機檔案修改時間的變更。預設 MetadataStore
是 SimpleMetadataStore
,它會在記憶體中儲存狀態。
從 4.1.5 版開始,這些篩選器有一個名為 flushOnUpdate
的新屬性,這會導致它們在每次更新時刷新元數據儲存區(如果儲存區實作 Flushable
)。
此外,如果您使用分散式 MetadataStore (例如 Redis 元數據儲存區),您可以擁有相同适配器或應用程式的多個實例,並確保只有一個實例處理檔案。 |
實際的本機篩選器是 CompositeFileListFilter
,其中包含提供的篩選器和模式篩選器,可防止處理正在下載的檔案(基於 temporary-file-suffix
)。檔案會使用此字尾下載(預設值為 .writing
),並且檔案會在傳輸完成時重新命名為其最終名稱,使其對篩選器「可見」。
如需有關這些屬性的更多詳細資訊,請參閱 schema。
SFTP 接收通道适配器是一個輪詢消費者。因此,您必須配置 poller(全域預設值或本機元素)。一旦檔案傳輸到本機目錄,就會產生以 java.io.File
作為其 payload 類型的訊息,並傳送到由 channel
屬性識別的通道。
從 6.2 版開始,您可以使用 SftpLastModifiedFileListFilter
根據上次修改策略篩選 SFTP 檔案。可以為此篩選器配置 age
屬性,以便只有早於此值的檔案才會通過篩選器。age 預設為 60 秒,但您應該選擇一個足夠大的 age,以避免過早地選取檔案(例如,由於網路故障)。請查看其 Javadoc 以取得更多資訊。
關於檔案篩選和大型檔案的更多資訊
有時,剛出現在監控(遠端)目錄中的檔案不完整。通常,此類檔案會使用某些臨時副檔名寫入(例如,名為 something.txt.writing
的檔案上的 .writing
),然後在寫入過程完成後重新命名。在大多數情況下,開發人員只對完整的檔案感興趣,並且希望僅篩選這些檔案。為了處理這些情況,您可以使用 filename-pattern
、filename-regex
和 filter
屬性提供的篩選支援。如果您需要自訂篩選器實作,您可以透過設定 filter
屬性在您的适配器中包含參考。以下範例示範如何執行此操作
<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
channel="receiveChannel"
session-factory="sftpSessionFactory"
filter="customFilter"
local-directory="file:/local-test-dir"
remote-directory="/remote-test-dir">
<int:poller fixed-rate="1000" max-messages-per-poll="10" task-executor="executor"/>
</int-sftp:inbound-channel-adapter>
<bean id="customFilter" class="org.foo.CustomFilter"/>
從失敗中復原
您應該了解适配器的架構。檔案同步器擷取檔案,而 FileReadingMessageSource
會為每個同步的檔案發出訊息。如 先前討論,涉及兩個篩選器。filter
屬性(和模式)指的是遠端 (SFTP) 檔案列表,以避免擷取已擷取的檔案。FileReadingMessageSource
使用 local-filter
來判斷要作為訊息傳送的檔案。
同步器列出遠端檔案並諮詢其篩選器。然後傳輸檔案。如果在檔案傳輸期間發生 IO 錯誤,則會移除已新增至篩選器的任何檔案,以便它們可以在下次輪詢時重新擷取。這僅適用於篩選器實作 ReversibleFileListFilter
(例如 AcceptOnceFileListFilter
)的情況。
如果在同步檔案之後,下游流程處理檔案時發生錯誤,則不會發生篩選器的自動回滾,因此預設不會重新處理失敗的檔案。
如果您希望在失敗後重新處理此類檔案,您可以使用類似於以下的配置來促進從篩選器中移除失敗的檔案
<int-sftp:inbound-channel-adapter id="sftpAdapter"
session-factory="sftpSessionFactory"
channel="requestChannel"
remote-directory-expression="'/sftpSource'"
local-directory="file:myLocalDir"
auto-create-local-directory="true"
filename-pattern="*.txt">
<int:poller fixed-rate="1000">
<int:transactional synchronization-factory="syncFactory" />
</int:poller>
</int-sftp:inbound-channel-adapter>
<bean id="acceptOnceFilter"
class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />
<int:transaction-synchronization-factory id="syncFactory">
<int:after-rollback expression="payload.delete()" />
</int:transaction-synchronization-factory>
<bean id="transactionManager"
class="org.springframework.integration.transaction.PseudoTransactionManager" />
先前的配置適用於任何 ResettableFileListFilter
。
從 5.0 版開始,接收通道适配器可以在本機建立子目錄,根據產生的本機檔案名稱。這也可以是遠端子路徑。為了能夠以遞迴方式讀取本機目錄以根據階層支援進行修改,您現在可以使用新的 RecursiveDirectoryScanner
(基於 Files.walk()
演算法)來提供內部 FileReadingMessageSource
。如需更多資訊,請參閱 AbstractInboundFileSynchronizingMessageSource.setScanner()
。此外,您現在可以使用 setUseWatchService()
選項將 AbstractInboundFileSynchronizingMessageSource
切換到基於 WatchService
的 DirectoryScanner
。它也針對所有 WatchEventType
實例進行配置,以對本機目錄中的任何修改做出反應。先前顯示的重新處理範例是基於 FileReadingMessageSource.WatchServiceDirectoryScanner
的內建功能,當從本機目錄中刪除檔案時(StandardWatchEventKinds.ENTRY_DELETE
),它會使用 ResettableFileListFilter.remove()
。如需更多資訊,請參閱 WatchServiceDirectoryScanner
。
使用 Java 配置進行配置
以下 Spring Boot 應用程式示範如何使用 Java 配置接收适配器
@SpringBootApplication
public class SftpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public SessionFactory<SftpClient.DirEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost("localhost");
factory.setPort(port);
factory.setUser("foo");
factory.setPassword("foo");
factory.setAllowUnknownKeys(true);
factory.setTestSession(true);
return new CachingSessionFactory<>(factory);
}
@Bean
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory("foo");
fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.xml"));
return fileSynchronizer;
}
@Bean
@InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<File> sftpMessageSource() {
SftpInboundFileSynchronizingMessageSource source =
new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
source.setLocalDirectory(new File("sftp-inbound"));
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<File>());
source.setMaxFetchSize(1);
return source;
}
@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
}
使用 Java DSL 進行配置
以下 Spring Boot 應用程式示範如何使用 Java DSL 配置接收适配器
@SpringBootApplication
public class SftpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow sftpInboundFlow() {
return IntegrationFlow
.from(Sftp.inboundAdapter(this.sftpSessionFactory)
.preserveTimestamp(true)
.remoteDirectory("foo")
.regexFilter(".*\\.txt$")
.localFilenameExpression("#this.toUpperCase() + '.a'")
.localDirectory(new File("sftp-inbound")),
e -> e.id("sftpInboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedDelay(5000)))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
}