讀取檔案
FileReadingMessageSource
可用於從檔案系統取用檔案。這是 MessageSource
的實作,可從檔案系統目錄建立訊息。以下範例顯示如何設定 FileReadingMessageSource
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:directory="${input.directory}"/>
為了防止為特定檔案建立訊息,您可以提供 FileListFilter
。預設情況下,我們使用以下篩選器
-
IgnoreHiddenFileListFilter
-
AcceptOnceFileListFilter
IgnoreHiddenFileListFilter
確保不會處理隱藏檔案。請注意,隱藏的確切定義取決於系統。例如,在基於 UNIX 的系統上,以句點字元開頭的檔案被視為隱藏檔案。另一方面,Microsoft Windows 具有專用的檔案屬性來指示隱藏檔案。
版本 4.2 引入了 |
AcceptOnceFileListFilter
確保檔案僅從目錄中選取一次。
自版本 4.0 起,此篩選器需要 自版本 4.1.5 起,此篩選器有一個新的屬性 ( |
持久性檔案列表篩選器現在具有布林屬性 forRecursion
。將此屬性設定為 true
,也會設定 alwaysAcceptDirectories
,這表示輸出閘道 (ls
和 mget
) 上的遞迴操作現在將始終每次都遍歷完整的目錄樹。這是為了解決未偵測到目錄樹深處變更的問題。此外,forRecursion=true
會導致完整檔案路徑用作中繼資料儲存區金鑰;這解決了如果同名檔案多次出現在不同目錄中,篩選器無法正常運作的問題。重要事項:這表示在持久性中繼資料儲存區中的現有金鑰將不會為頂層目錄下的檔案找到。因此,預設情況下屬性為 false
;這可能會在未來的版本中變更。
以下範例設定具有篩選器的 FileReadingMessageSource
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:inputDirectory="${input.directory}"
p:filter-ref="customFilterBean"/>
讀取檔案的常見問題是,檔案可能在準備就緒之前就被偵測到 (也就是說,某些其他程序可能仍在寫入檔案)。預設的 AcceptOnceFileListFilter
無法防止這種情況。在大多數情況下,如果檔案寫入程序在檔案準備好讀取後立即重新命名每個檔案,則可以防止這種情況。filename-pattern
或 filename-regex
篩選器僅接受已準備好的檔案 (可能基於已知的後綴),並與預設的 AcceptOnceFileListFilter
組合,允許這種情況。CompositeFileListFilter
啟用組合,如下列範例所示
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:inputDirectory="${input.directory}"
p:filter-ref="compositeFilter"/>
<bean id="compositeFilter"
class="org.springframework.integration.file.filters.CompositeFileListFilter">
<constructor-arg>
<list>
<bean class="o.s.i.file.filters.AcceptOnceFileListFilter"/>
<bean class="o.s.i.file.filters.RegexPatternFileListFilter">
<constructor-arg value="^test.*$"/>
</bean>
</list>
</constructor-arg>
</bean>
如果無法使用臨時名稱建立檔案並重新命名為最終名稱,Spring Integration 提供了另一種替代方案。版本 4.2 新增了 LastModifiedFileListFilter
。可以為此篩選器設定 age
屬性,以便只有早於此值的檔案才會被篩選器通過。age 預設為 60 秒,但您應該選擇足夠大的 age,以避免過早選取檔案 (例如,由於網路故障)。以下範例顯示如何設定 LastModifiedFileListFilter
<bean id="filter" class="org.springframework.integration.file.filters.LastModifiedFileListFilter">
<property name="age" value="120" />
</bean>
從版本 4.3.7 開始,引入了 ChainFileListFilter
(CompositeFileListFilter
的擴充功能),以允許後續篩選器僅應查看先前篩選器結果的場景。(使用 CompositeFileListFilter
,所有篩選器都會看到所有檔案,但它僅通過已通過所有篩選器的檔案)。需要新行為的一個範例是 LastModifiedFileListFilter
和 AcceptOnceFileListFilter
的組合,當我們不希望在經過一段時間後才接受檔案時。使用 CompositeFileListFilter
,由於 AcceptOnceFileListFilter
在第一次傳遞時看到所有檔案,因此當另一個篩選器執行時,它不會在稍後通過檔案。當模式篩選器與自訂篩選器組合使用時,CompositeFileListFilter
方法很有用,後者尋找輔助檔案以指示檔案傳輸已完成。模式篩選器可能僅通過主要檔案 (例如 something.txt
),但「完成」篩選器需要查看 (例如) something.done
是否存在。
假設我們有檔案 a.txt
、a.done
和 b.txt
。
模式篩選器僅通過 a.txt
和 b.txt
,而「完成」篩選器看到所有三個檔案,並且僅通過 a.txt
。複合篩選器的最終結果是僅釋放 a.txt
。
使用 ChainFileListFilter ,如果鏈中的任何篩選器傳回空列表,則不會調用其餘篩選器。 |
版本 5.0 引入了 ExpressionFileListFilter
,以針對檔案作為內容評估根物件執行 SpEL 運算式。為此,所有用於檔案處理 (本機和遠端) 的 XML 元件,以及現有的 filter
屬性,都提供了 filter-expression
選項,如下列範例所示
<int-file:inbound-channel-adapter
directory="${inputdir}"
filter-expression="name matches '.text'"
auto-startup="false"/>
版本 5.0.5 引入了對被拒絕檔案感興趣的 DiscardAwareFileListFilter
實作。為此,應透過 addDiscardCallback(Consumer<File>)
為此類篩選器實作提供回呼。在框架中,此功能從 FileReadingMessageSource.WatchServiceDirectoryScanner
中使用,並與 LastModifiedFileListFilter
結合使用。與常規 DirectoryScanner
不同,WatchService
根據目標檔案系統上的事件提供要處理的檔案。在輪詢內部佇列中的這些檔案時,LastModifiedFileListFilter
可能會因為檔案相對於其設定的 age
而言太新而丟棄它們。因此,我們遺失了檔案以供未來可能考慮。丟棄回呼 Hook 讓我們將檔案保留在內部佇列中,以便在後續輪詢中檢查其 age
。CompositeFileListFilter
也實作了 DiscardAwareFileListFilter
,並將丟棄回呼填入其所有 DiscardAwareFileListFilter
委派。
由於 CompositeFileListFilter 比對所有委派的檔案,因此對於同一個檔案,可能會多次呼叫 discardCallback 。 |
從版本 5.1 開始,FileReadingMessageSource
不會檢查目錄是否存在,並且在呼叫其 start()
(通常透過包裝 SourcePollingChannelAdapter
) 之前不會建立目錄。先前,當參考目錄 (例如從測試中) 或稍後套用權限時,沒有簡單的方法可以防止作業系統權限錯誤。
訊息標頭
從版本 5.0 開始,FileReadingMessageSource
(除了作為輪詢 File
的 payload
之外) 將以下標頭填入輸出 Message
-
FileHeaders.FILENAME
:要傳送的檔案的File.getName()
。可用於後續重新命名或複製邏輯。 -
FileHeaders.ORIGINAL_FILE
:File
物件本身。通常,此標頭由框架元件 (例如 分割器 或 轉換器) 自動填入,當我們遺失原始File
物件時。但是,為了與任何其他自訂用例保持一致性和便利性,此標頭可用於存取原始檔案。 -
FileHeaders.RELATIVE_PATH
:引入的新標頭,用於表示相對於掃描根目錄的檔案路徑部分。當要求是在其他位置還原來源目錄階層時,此標頭可能很有用。為此,可以設定DefaultFileNameGenerator
(請參閱 "`產生檔案名稱) 以使用此標頭。
目錄掃描和輪詢
FileReadingMessageSource
不會立即為目錄中的檔案產生訊息。它使用內部佇列來存放 scanner
傳回的「合格檔案」。scanEachPoll
選項用於確保在每次輪詢時使用最新的輸入目錄內容刷新內部佇列。預設情況下 (scanEachPoll = false
),FileReadingMessageSource
在再次掃描目錄之前清空其佇列。此預設行為特別適用於減少掃描目錄中大量檔案的情況。但是,在需要自訂排序的情況下,務必考慮將此標誌設定為 true
的影響。處理檔案的順序可能與預期的不符。預設情況下,佇列中的檔案以其自然 (path
) 順序處理。掃描新增的新檔案,即使佇列已包含檔案,也會插入到適當的位置以維持該自然順序。若要自訂順序,FileReadingMessageSource
可以接受 Comparator<File>
作為建構子引數。內部 (PriorityBlockingQueue
) 使用它來根據業務需求重新排序其內容。因此,若要依特定順序處理檔案,您應該為 FileReadingMessageSource
提供比較器,而不是排序自訂 DirectoryScanner
產生的列表。
版本 5.0 引入了 RecursiveDirectoryScanner
以執行檔案樹狀結構訪問。實作基於 Files.walk(Path start, int maxDepth, FileVisitOption… options)
功能。根目錄 (DirectoryScanner.listFiles(File)
) 引數從結果中排除。所有其他子目錄包含和排除都基於目標 FileListFilter
實作。例如,SimplePatternFileListFilter
預設會篩選出目錄。有關更多資訊,請參閱 AbstractDirectoryAwareFileListFilter
及其實作。
從版本 5.5 開始,Java DSL 的 FileInboundChannelAdapterSpec 具有方便的 recursive(boolean) 選項,可在目標 FileReadingMessageSource 中使用 RecursiveDirectoryScanner 而不是預設的掃描器。 |
命名空間支援
可以使用檔案特定的命名空間來簡化檔案讀取的設定。若要執行此操作,請使用以下範本
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xsi:schemaLocation="http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file
https://www.springframework.org/schema/integration/file/spring-integration-file.xsd">
</beans>
在此命名空間中,您可以減少 FileReadingMessageSource
並將其包裝在輸入通道配接器中,如下所示
<int-file:inbound-channel-adapter id="filesIn1"
directory="file:${input.directory}" prevent-duplicates="true" ignore-hidden="true"/>
<int-file:inbound-channel-adapter id="filesIn2"
directory="file:${input.directory}"
filter="customFilterBean" />
<int-file:inbound-channel-adapter id="filesIn3"
directory="file:${input.directory}"
filename-pattern="test*" />
<int-file:inbound-channel-adapter id="filesIn4"
directory="file:${input.directory}"
filename-regex="test[0-9]+\.txt" />
第一個通道配接器範例依賴預設的 FileListFilter
實作
-
IgnoreHiddenFileListFilter
(不處理隱藏檔案) -
AcceptOnceFileListFilter
(防止重複)
因此,您也可以省略 prevent-duplicates
和 ignore-hidden
屬性,因為它們預設為 true
。
Spring Integration 4.2 引入了 |
第二個通道配接器範例使用自訂篩選器,第三個使用 filename-pattern
屬性新增基於 AntPathMatcher
的篩選器,第四個使用 filename-regex
屬性將基於正則運算式模式的篩選器新增至 FileReadingMessageSource
。filename-pattern
和 filename-regex
屬性彼此互斥,且與常規 filter
參考屬性互斥。但是,您可以使用 filter
屬性來參考 CompositeFileListFilter
的實例,該實例組合了任意數量的篩選器,包括一個或多個基於模式的篩選器,以滿足您的特定需求。
當多個程序從同一個目錄讀取時,您可能想要鎖定檔案以防止它們同時被選取。若要執行此操作,您可以使用 FileLocker
。有一個基於 java.nio
的實作可用,但也可以實作您自己的鎖定方案。可以如下所示注入 nio
locker
<int-file:inbound-channel-adapter id="filesIn"
directory="file:${input.directory}" prevent-duplicates="true">
<int-file:nio-locker/>
</int-file:inbound-channel-adapter>
您可以如下所示設定自訂 locker
<int-file:inbound-channel-adapter id="filesIn"
directory="file:${input.directory}" prevent-duplicates="true">
<int-file:locker ref="customLocker"/>
</int-file:inbound-channel-adapter>
當使用 locker 設定檔案輸入配接器時,它負責在允許接收檔案之前取得鎖定。它不承擔解鎖檔案的責任。如果您已處理檔案並保持鎖定懸而未決,則會發生記憶體洩漏。如果這是一個問題,您應該在適當的時間自行呼叫 FileLocker.unlock(File file) 。 |
當篩選和鎖定檔案不足時,您可能需要完全控制列出檔案的方式。若要實作此類型的需求,您可以使用 DirectoryScanner
的實作。此掃描器可讓您準確判斷每次輪詢中列出的檔案。這也是 Spring Integration 在內部用於將 FileListFilter
實例和 FileLocker
連接到 FileReadingMessageSource
的介面。您可以在 scanner
屬性上將自訂 DirectoryScanner
注入到 <int-file:inbound-channel-adapter/>
中,如下列範例所示
<int-file:inbound-channel-adapter id="filesIn" directory="file:${input.directory}"
scanner="customDirectoryScanner"/>
這樣做可讓您完全自由地選擇排序、列出和鎖定策略。
同樣重要的是要了解篩選器 (包括 patterns
、regex
、prevent-duplicates
和其他篩選器) 和 locker
實例實際上是由 scanner
使用的。在配接器上設定的任何這些屬性隨後都會注入到內部 scanner
中。對於外部 scanner
的情況,FileReadingMessageSource
上禁止所有篩選器和 locker 屬性。它們必須在該自訂 DirectoryScanner
上指定 (如果需要)。換句話說,如果您將 scanner
注入到 FileReadingMessageSource
中,則應在該 scanner
上提供 filter
和 locker
,而不是在 FileReadingMessageSource
上。
預設情況下,DefaultDirectoryScanner 使用 IgnoreHiddenFileListFilter 和 AcceptOnceFileListFilter 。若要防止使用它們,您可以設定自己的篩選器 (例如 AcceptAllFileListFilter ) 甚至將其設定為 null 。 |
WatchServiceDirectoryScanner
FileReadingMessageSource.WatchServiceDirectoryScanner
依賴檔案系統事件,當新檔案新增至目錄時。在初始化期間,目錄已註冊以產生事件。初始檔案列表也在初始化期間建立。在走訪目錄樹狀結構時,遇到的任何子目錄也會註冊以產生事件。在第一次輪詢時,傳回從走訪目錄的初始檔案列表。在後續輪詢中,傳回來自新建立事件的檔案。如果新增了新的子目錄,則其建立事件用於走訪新的子樹以尋找現有檔案並註冊找到的任何新子目錄。
當 WatchKey 的內部事件 queue 沒有被程式盡快耗盡時,目錄修改事件發生時會出現問題。如果佇列大小超出,則會發出 StandardWatchEventKinds.OVERFLOW 以指示某些檔案系統事件可能會遺失。在這種情況下,根目錄會完全重新掃描。為了避免重複,請考慮使用適當的 FileListFilter (例如 AcceptOnceFileListFilter ) 或在處理完成時移除檔案。 |
可以透過 FileReadingMessageSource.use-watch-service
選項啟用 WatchServiceDirectoryScanner
,該選項與 scanner
選項互斥。內部 FileReadingMessageSource.WatchServiceDirectoryScanner
實例會為提供的 directory
填入。
此外,現在 WatchService
輪詢邏輯可以追蹤 StandardWatchEventKinds.ENTRY_MODIFY
和 StandardWatchEventKinds.ENTRY_DELETE
。
如果您需要追蹤現有檔案的修改以及新檔案,則應在 FileListFilter
中實作 ENTRY_MODIFY
事件邏輯。否則,來自這些事件的檔案會以相同的方式處理。
ResettableFileListFilter
實作會選取 ENTRY_DELETE
事件。因此,會為其檔案提供 remove()
操作。當啟用此事件時,諸如 AcceptOnceFileListFilter
之類的篩選器會移除檔案。因此,如果出現同名檔案,它將通過篩選器並作為訊息傳送。
為此,已引入 watch-events
屬性 (FileReadingMessageSource.setWatchEvents(WatchEventType… watchEvents)
)。(WatchEventType
是 FileReadingMessageSource
中的公用內部列舉)。使用此選項,我們可以針對新檔案使用一個下游流程邏輯,並針對修改後的檔案使用一些其他邏輯。以下範例顯示如何在同一個目錄中為建立和修改事件設定不同的邏輯
值得一提的是,ENTRY_DELETE
事件涉及已監看目錄的子目錄的重新命名操作。更具體地說,與先前的目錄名稱相關的 ENTRY_DELETE
事件,在通知新 (重新命名) 目錄的 ENTRY_CREATE
事件之前。在某些作業系統 (例如 Windows) 上,必須註冊 ENTRY_DELETE
事件以處理這種情況。否則,在檔案總管中重新命名監看的子目錄可能會導致無法在該子目錄中偵測到新檔案。
<int-file:inbound-channel-adapter id="newFiles"
directory="${input.directory}"
use-watch-service="true"/>
<int-file:inbound-channel-adapter id="modifiedFiles"
directory="${input.directory}"
use-watch-service="true"
filter="acceptAllFilter"
watch-events="MODIFY"/> <!-- The default is CREATE. -->
從版本 6.1 開始,FileReadingMessageSource
公開了兩個新的 WatchService
相關選項
-
watchMaxDepth
- 用於Files.walkFileTree(Path root, Set attributes, int maxDepth, FileVisitor visitor)
API 的引數(argument); -
watchDirPredicate
- 一個Predicate<Path>
,用於測試掃描樹狀結構中的目錄是否應被遍歷並註冊到WatchService
以及已設定的監看事件種類。
限制記憶體消耗
您可以使用 HeadDirectoryScanner
來限制記憶體中保留的檔案數量。這在掃描大型目錄時非常有用。透過 XML 配置,可以透過設定輸入通道配接器上的 queue-size
屬性來啟用此功能。
在 4.2 版本之前,此設定與使用任何其他篩選器不相容。任何其他篩選器(包括 prevent-duplicates="true"
)都會覆寫用於限制大小的篩選器。
使用 一般來說,在這種情況下,您應該移除已處理的檔案,而不是使用 |
使用 Java 配置進行配置
以下 Spring Boot 應用程式展示了如何使用 Java 配置來配置輸出配接器的範例
@SpringBootApplication
public class FileReadingJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileReadingJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public MessageChannel fileInputChannel() {
return new DirectChannel();
}
@Bean
@InboundChannelAdapter(value = "fileInputChannel", poller = @Poller(fixedDelay = "1000"))
public MessageSource<File> fileReadingMessageSource() {
FileReadingMessageSource source = new FileReadingMessageSource();
source.setDirectory(new File(INBOUND_PATH));
source.setFilter(new SimplePatternFileListFilter("*.txt"));
return source;
}
@Bean
@Transformer(inputChannel = "fileInputChannel", outputChannel = "processFileChannel")
public FileToStringTransformer fileToStringTransformer() {
return new FileToStringTransformer();
}
}
使用 Java DSL 進行配置
以下 Spring Boot 應用程式展示了如何使用 Java DSL 來配置輸出配接器的範例
@SpringBootApplication
public class FileReadingJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileReadingJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow fileReadingFlow() {
return IntegrationFlow
.from(Files.inboundAdapter(new File(INBOUND_PATH))
.patternFilter("*.txt"),
e -> e.poller(Pollers.fixedDelay(1000)))
.transform(Files.toStringTransformer())
.channel("processFileChannel")
.get();
}
}
「tail」檔案
另一個常見的使用案例是從檔案的末尾(或尾部)取得「行」,並在新增行時捕獲新行。提供了兩種實作方式。第一種是 OSDelegatingFileTailingMessageProducer
,它使用原生 tail
命令(在具有該命令的作業系統上)。這通常是這些平台上最有效率的實作方式。對於沒有 tail
命令的作業系統,第二種實作方式 ApacheCommonsFileTailingMessageProducer
使用 Apache commons-io
Tailer
類別。
在這兩種情況下,檔案系統事件(例如檔案不可用和其他事件)都會使用正常的 Spring 事件發布機制發布為 ApplicationEvent
實例。此類事件的範例包括以下內容
[message=tail: cannot open '/tmp/somefile' for reading:
No such file or directory, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has become accessible, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has become inaccessible:
No such file or directory, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has appeared;
following end of new file, file=/tmp/somefile]
先前範例中顯示的事件序列可能會發生,例如,當檔案輪替時。
從 5.0 版本開始,當 idleEventInterval
期間檔案中沒有資料時,會發出 FileTailingIdleEvent
。以下範例顯示了此類事件的外觀
[message=Idle timeout, file=/tmp/somefile] [idle time=5438]
並非所有支援 tail 命令的平台都提供這些狀態訊息。 |
從這些端點發出的訊息具有以下標頭
-
FileHeaders.ORIGINAL_FILE
:File
物件 -
FileHeaders.FILENAME
:檔案名稱 (File.getName()
)
在 5.0 版本之前的版本中,FileHeaders.FILENAME 標頭包含檔案絕對路徑的字串表示形式。您現在可以透過在原始檔案標頭上呼叫 getAbsolutePath() 來取得該字串表示形式。 |
以下範例使用預設選項 ('-F -n 0',表示從目前結尾追蹤檔案名稱) 建立原生配接器。
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
task-executor="exec"
file="/tmp/foo"/>
以下範例使用 '-F -n +0' 選項(表示追蹤檔案名稱,發出所有現有行)建立原生配接器。
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
native-options="-F -n +0"
task-executor="exec"
file-delay=10000
file="/tmp/foo"/>
如果 tail
命令失敗(在某些平台上,即使指定了 -F
,遺失檔案也會導致 tail
失敗),則每 10 秒重試該命令。
預設情況下,原生配接器會從標準輸出捕獲內容並將其作為訊息傳送。它們也會從標準錯誤捕獲內容以引發事件。從 4.3.6 版本開始,您可以透過將 enable-status-reader
設定為 false
來捨棄標準錯誤事件,如下列範例所示
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
enable-status-reader="false"
task-executor="exec"
file="/tmp/foo"/>
在以下範例中,IdleEventInterval
設定為 5000
,表示如果五秒鐘內沒有寫入任何行,則每五秒鐘觸發一次 FileTailingIdleEvent
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
idle-event-interval="5000"
task-executor="exec"
file="/tmp/somefile"/>
當您需要停止配接器時,這可能會很有用。
以下範例建立了一個 Apache commons-io
Tailer
配接器,該配接器每兩秒檢查檔案是否有新行,並每十秒檢查一次遺失檔案是否存在
<int-file:tail-inbound-channel-adapter id="apache"
channel="input"
task-executor="exec"
file="/tmp/bar"
delay="2000"
end="false" (1)
reopen="true" (2)
file-delay="10000"/>
1 | 檔案從開頭 (end="false" ) 而不是結尾 (預設值) 開始追蹤。 |
2 | 每個區塊都會重新開啟檔案(預設值是保持檔案開啟)。 |
指定 delay 、end 或 reopen 屬性會強制使用 Apache commons-io 配接器,並使 native-options 屬性無法使用。 |