檔案分割器

FileSplitter 在 4.1.2 版本中新增,其命名空間支援在 4.2 版本中新增。FileSplitter 基於 BufferedReader.readLine() 將文字檔案分割成個別行。預設情況下,分割器使用 Iterator 逐行發送從檔案讀取的行。將 iterator 屬性設定為 false 會使其在將所有行作為訊息發送之前,先將所有行讀取到記憶體中。這樣做的一個使用案例可能是,您希望在發送包含行的任何訊息之前,偵測檔案上的 I/O 錯誤。但是,這僅適用於相對較短的檔案。

輸入酬載可以是 FileStringFile 路徑)、InputStreamReader。其他酬載類型會保持不變發送。

以下列表顯示設定 FileSplitter 的可能方式

  • Java DSL

  • Kotlin DSL

  • Java

  • XML

@SpringBootApplication
public class FileSplitterApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FileSplitterApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow fileSplitterFlow() {
        return IntegrationFlow
            .from(Files.inboundAdapter(tmpDir.getRoot())
                 .filter(new ChainFileListFilter<File>()
                        .addFilter(new AcceptOnceFileListFilter<>())
                        .addFilter(new ExpressionFileListFilter<>(
                             new FunctionExpression<File>(f -> "foo.tmp".equals(f.getName()))))))
            .split(Files.splitter()
                     .markers()
                     .charset(StandardCharsets.US_ASCII)
                     .firstLineAsHeader("fileHeader")
                     .applySequence(true))
            .channel(c -> c.queue("fileSplittingResultChannel"))
            .get();
    }

}
@Bean
fun fileSplitterFlow() =
    integrationFlow(
        Files.inboundAdapter(tmpDir.getRoot())
            .filter(
                ChainFileListFilter<File?>()
                    .addFilter(AcceptOnceFileListFilter())
                    .addFilter(ExpressionFileListFilter(FunctionExpression { f: File? -> "foo.tmp" == f!!.name }))
            )
    ) {
        split(
            Files.splitter()
                .markers()
                .charset(StandardCharsets.US_ASCII)
                .firstLineAsHeader("fileHeader")
                .applySequence(true)
        )
        channel { queue("fileSplittingResultChannel") }
    }
@Splitter(inputChannel="toSplitter")
@Bean
public MessageHandler fileSplitter() {
    FileSplitter splitter = new FileSplitter(true, true);
    splitter.setApplySequence(true);
    splitter.setOutputChannel(outputChannel);
    return splitter;
}
<int-file:splitter id="splitter" (1)
    iterator=""                  (2)
    markers=""                   (3)
    markers-json=""              (4)
    apply-sequence=""            (5)
    requires-reply=""            (6)
    charset=""                   (7)
    first-line-as-header=""      (8)
    input-channel=""             (9)
    output-channel=""            (10)
    send-timeout=""              (11)
    auto-startup=""              (12)
    order=""                     (13)
    phase="" />                  (14)
1 分割器的 Bean 名稱。
2 設定為 true(預設值)以使用迭代器,或設定為 false 以在發送行之前將檔案載入到記憶體中。
3 設定為 true 以在檔案資料之前和之後發送檔案開始和檔案結束標記訊息。標記是具有 FileSplitter.FileMarker 酬載的訊息(在 mark 屬性中具有 STARTEND 值)。當您在下游流程中循序處理檔案(其中某些行被篩選)時,可以使用標記。它們使下游處理能夠知道檔案何時已完全處理。此外,包含 STARTENDfile_marker 標頭會新增至這些訊息。END 標記包含行數。如果檔案為空,則只會發送 STARTEND 標記,且 lineCount0。預設值為 false。當為 true 時,apply-sequence 預設為 false。另請參閱 markers-json(下一個屬性)。
4 markers 為 true 時,將此設定為 true 以將 FileMarker 物件轉換為 JSON 字串。(底層使用 SimpleJsonSerializer)。
5 設定為 false 以停用在訊息中包含 sequenceSizesequenceNumber 標頭。預設值為 true,除非 markerstrue。當 truemarkerstrue 時,標記會包含在排序中。當 trueiteratortrue 時,sequenceSize 標頭會設定為 0,因為大小未知。
6 設定為 true 以在檔案中沒有行時拋出 RequiresReplyException。預設值為 false
7 設定用於將文字資料讀取到 String 酬載中的字元集名稱。預設值為平台字元集。
8 第一行的標頭名稱,將作為標頭在為剩餘行發送的訊息中攜帶。自 5.0 版本起。
9 設定用於將訊息發送到分割器的輸入通道。
10 設定訊息發送到的輸出通道。
11 設定發送逾時。僅在 output-channel 可以阻塞時適用 — 例如完整的 QueueChannel
12 設定為 false 以在內容重新整理時停用自動啟動分割器。預設值為 true
13 如果 input-channel<publish-subscribe-channel/>,則設定此端點的順序。
14 設定分割器的啟動階段(在 auto-startuptrue 時使用)。

FileSplitter 也會將任何基於文字的 InputStream 分割成行。從 4.3 版本開始,當與 FTP 或 SFTP 串流輸入通道配接器或使用 stream 選項檢索檔案的 FTP 或 SFTP 輸出閘道器結合使用時,當檔案完全使用完畢時,分割器會自動關閉支援串流的 session。請參閱 FTP 串流輸入通道配接器SFTP 串流輸入通道配接器 以及 FTP 輸出閘道器SFTP 輸出閘道器 以取得有關這些功能的更多資訊。

當使用 Java 設定時,可以使用額外的建構子,如下列範例所示

public FileSplitter(boolean iterator, boolean markers, boolean markersJson)

markersJson 為 true 時,標記會表示為 JSON 字串(使用 SimpleJsonSerializer)。

5.0 版本引入了 firstLineAsHeader 選項,用於指定內容的第一行是標頭(例如 CSV 檔案中的欄位名稱)。傳遞給此屬性的引數是標頭名稱,第一行將作為標頭在為剩餘行發送的訊息中攜帶。此行不包含在序列標頭中(如果 applySequence 為 true),也不包含在與 FileMarker.END 相關聯的 lineCount 中。注意:從 5.5 版本開始,lineCount 也作為 FileHeaders.LINE_COUNT 包含在 FileMarker.END 訊息的標頭中,因為 FileMarker 可以序列化為 JSON。如果檔案僅包含標頭行,則該檔案會被視為空檔案,因此,在分割期間只會發送 FileMarker 實例(如果已啟用標記 — 否則,不會發送任何訊息)。預設情況下(如果未設定標頭名稱),第一行會被視為資料,並成為第一個發送訊息的酬載。

如果您需要關於從檔案內容中提取標頭的更複雜邏輯(不是第一行,不是行的全部內容,不是一個特定的標頭等等),請考慮在 FileSplitter 之前使用 標頭豐富器。請注意,已移動到標頭的行可能會在下游從正常內容處理中篩選掉。

分割檔案的冪等下游處理

apply-sequence 為 true 時,分割器會在 SEQUENCE_NUMBER 標頭中新增行號(當 markers 為 true 時,標記會計為行)。行號可以與 冪等接收器 一起使用,以避免在重新啟動後重新處理行。

例如

@Bean
public ConcurrentMetadataStore store() {
    return new ZookeeperMetadataStore();
}

@Bean
public MetadataStoreSelector selector() {
    return new MetadataStoreSelector(
            message -> message.getHeaders().get(FileHeaders.ORIGINAL_FILE, File.class)
                    .getAbsolutePath(),
            message -> message.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)
                    .toString(),
            store())
                    .compareValues(
                            (oldVal, newVal) -> Integer.parseInt(oldVal) < Integer.parseInt(newVal));
}

@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
    return new IdempotentReceiverInterceptor(selector());
}

@Bean
public IntegrationFlow flow() {
    ...
    .split(new FileSplitter())
    ...
    .handle("lineHandler", e -> e.advice(idempotentReceiverInterceptor()))
    ...
}