檔案分割器
FileSplitter
在 4.1.2 版本中新增,其命名空間支援在 4.2 版本中新增。FileSplitter
基於 BufferedReader.readLine()
將文字檔案分割成個別行。預設情況下,分割器使用 Iterator
逐行發送從檔案讀取的行。將 iterator
屬性設定為 false
會使其在將所有行作為訊息發送之前,先將所有行讀取到記憶體中。這樣做的一個使用案例可能是,您希望在發送包含行的任何訊息之前,偵測檔案上的 I/O 錯誤。但是,這僅適用於相對較短的檔案。
輸入酬載可以是 File
、String
(File
路徑)、InputStream
或 Reader
。其他酬載類型會保持不變發送。
以下列表顯示設定 FileSplitter
的可能方式
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@SpringBootApplication
public class FileSplitterApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileSplitterApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow fileSplitterFlow() {
return IntegrationFlow
.from(Files.inboundAdapter(tmpDir.getRoot())
.filter(new ChainFileListFilter<File>()
.addFilter(new AcceptOnceFileListFilter<>())
.addFilter(new ExpressionFileListFilter<>(
new FunctionExpression<File>(f -> "foo.tmp".equals(f.getName()))))))
.split(Files.splitter()
.markers()
.charset(StandardCharsets.US_ASCII)
.firstLineAsHeader("fileHeader")
.applySequence(true))
.channel(c -> c.queue("fileSplittingResultChannel"))
.get();
}
}
@Bean
fun fileSplitterFlow() =
integrationFlow(
Files.inboundAdapter(tmpDir.getRoot())
.filter(
ChainFileListFilter<File?>()
.addFilter(AcceptOnceFileListFilter())
.addFilter(ExpressionFileListFilter(FunctionExpression { f: File? -> "foo.tmp" == f!!.name }))
)
) {
split(
Files.splitter()
.markers()
.charset(StandardCharsets.US_ASCII)
.firstLineAsHeader("fileHeader")
.applySequence(true)
)
channel { queue("fileSplittingResultChannel") }
}
@Splitter(inputChannel="toSplitter")
@Bean
public MessageHandler fileSplitter() {
FileSplitter splitter = new FileSplitter(true, true);
splitter.setApplySequence(true);
splitter.setOutputChannel(outputChannel);
return splitter;
}
<int-file:splitter id="splitter" (1)
iterator="" (2)
markers="" (3)
markers-json="" (4)
apply-sequence="" (5)
requires-reply="" (6)
charset="" (7)
first-line-as-header="" (8)
input-channel="" (9)
output-channel="" (10)
send-timeout="" (11)
auto-startup="" (12)
order="" (13)
phase="" /> (14)
1 | 分割器的 Bean 名稱。 |
2 | 設定為 true (預設值)以使用迭代器,或設定為 false 以在發送行之前將檔案載入到記憶體中。 |
3 | 設定為 true 以在檔案資料之前和之後發送檔案開始和檔案結束標記訊息。標記是具有 FileSplitter.FileMarker 酬載的訊息(在 mark 屬性中具有 START 和 END 值)。當您在下游流程中循序處理檔案(其中某些行被篩選)時,可以使用標記。它們使下游處理能夠知道檔案何時已完全處理。此外,包含 START 或 END 的 file_marker 標頭會新增至這些訊息。END 標記包含行數。如果檔案為空,則只會發送 START 和 END 標記,且 lineCount 為 0 。預設值為 false 。當為 true 時,apply-sequence 預設為 false 。另請參閱 markers-json (下一個屬性)。 |
4 | 當 markers 為 true 時,將此設定為 true 以將 FileMarker 物件轉換為 JSON 字串。(底層使用 SimpleJsonSerializer )。 |
5 | 設定為 false 以停用在訊息中包含 sequenceSize 和 sequenceNumber 標頭。預設值為 true ,除非 markers 為 true 。當 true 且 markers 為 true 時,標記會包含在排序中。當 true 且 iterator 為 true 時,sequenceSize 標頭會設定為 0 ,因為大小未知。 |
6 | 設定為 true 以在檔案中沒有行時拋出 RequiresReplyException 。預設值為 false 。 |
7 | 設定用於將文字資料讀取到 String 酬載中的字元集名稱。預設值為平台字元集。 |
8 | 第一行的標頭名稱,將作為標頭在為剩餘行發送的訊息中攜帶。自 5.0 版本起。 |
9 | 設定用於將訊息發送到分割器的輸入通道。 |
10 | 設定訊息發送到的輸出通道。 |
11 | 設定發送逾時。僅在 output-channel 可以阻塞時適用 — 例如完整的 QueueChannel 。 |
12 | 設定為 false 以在內容重新整理時停用自動啟動分割器。預設值為 true 。 |
13 | 如果 input-channel 是 <publish-subscribe-channel/> ,則設定此端點的順序。 |
14 | 設定分割器的啟動階段(在 auto-startup 為 true 時使用)。 |
FileSplitter
也會將任何基於文字的 InputStream
分割成行。從 4.3 版本開始,當與 FTP 或 SFTP 串流輸入通道配接器或使用 stream
選項檢索檔案的 FTP 或 SFTP 輸出閘道器結合使用時,當檔案完全使用完畢時,分割器會自動關閉支援串流的 session。請參閱 FTP 串流輸入通道配接器 和 SFTP 串流輸入通道配接器 以及 FTP 輸出閘道器 和 SFTP 輸出閘道器 以取得有關這些功能的更多資訊。
當使用 Java 設定時,可以使用額外的建構子,如下列範例所示
public FileSplitter(boolean iterator, boolean markers, boolean markersJson)
當 markersJson
為 true 時,標記會表示為 JSON 字串(使用 SimpleJsonSerializer
)。
5.0 版本引入了 firstLineAsHeader
選項,用於指定內容的第一行是標頭(例如 CSV 檔案中的欄位名稱)。傳遞給此屬性的引數是標頭名稱,第一行將作為標頭在為剩餘行發送的訊息中攜帶。此行不包含在序列標頭中(如果 applySequence
為 true),也不包含在與 FileMarker.END
相關聯的 lineCount
中。注意:從 5.5 版本開始,lineCount
也作為 FileHeaders.LINE_COUNT
包含在 FileMarker.END
訊息的標頭中,因為 FileMarker
可以序列化為 JSON。如果檔案僅包含標頭行,則該檔案會被視為空檔案,因此,在分割期間只會發送 FileMarker
實例(如果已啟用標記 — 否則,不會發送任何訊息)。預設情況下(如果未設定標頭名稱),第一行會被視為資料,並成為第一個發送訊息的酬載。
如果您需要關於從檔案內容中提取標頭的更複雜邏輯(不是第一行,不是行的全部內容,不是一個特定的標頭等等),請考慮在 FileSplitter
之前使用 標頭豐富器。請注意,已移動到標頭的行可能會在下游從正常內容處理中篩選掉。
分割檔案的冪等下游處理
當 apply-sequence
為 true 時,分割器會在 SEQUENCE_NUMBER
標頭中新增行號(當 markers
為 true 時,標記會計為行)。行號可以與 冪等接收器 一起使用,以避免在重新啟動後重新處理行。
例如
@Bean
public ConcurrentMetadataStore store() {
return new ZookeeperMetadataStore();
}
@Bean
public MetadataStoreSelector selector() {
return new MetadataStoreSelector(
message -> message.getHeaders().get(FileHeaders.ORIGINAL_FILE, File.class)
.getAbsolutePath(),
message -> message.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)
.toString(),
store())
.compareValues(
(oldVal, newVal) -> Integer.parseInt(oldVal) < Integer.parseInt(newVal));
}
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(selector());
}
@Bean
public IntegrationFlow flow() {
...
.split(new FileSplitter())
...
.handle("lineHandler", e -> e.advice(idempotentReceiverInterceptor()))
...
}