檔案彙集器

從 5.5 版本開始,引入了 FileAggregator,以涵蓋在啟用 START/END 標記時 FileSplitter 用例的另一方面。為了方便起見,FileAggregator 實作了所有三個序列詳細資訊策略

  • HeaderAttributeCorrelationStrategyFileHeaders.FILENAME 屬性用於關聯鍵計算。當在 FileSplitter 上啟用標記時,它不會填入序列詳細資訊標頭,因為 START/END 標記訊息也包含在序列大小中。FileHeaders.FILENAME 仍然會為發出的每一行填入,包括 START/END 標記訊息。

  • FileMarkerReleaseStrategy - 檢查群組中是否有 FileSplitter.FileMarker.Mark.END 訊息,然後將 FileHeaders.LINE_COUNT 標頭值與群組大小減去 2 - FileSplitter.FileMarker 實例進行比較。它也實作了方便的 GroupConditionProvider 介面,用於 AbstractCorrelatingMessageHandler 中使用的 conditionSupplier 函數。有關更多資訊,請參閱 訊息群組條件

  • FileAggregatingMessageGroupProcessor 僅從群組中移除 FileSplitter.FileMarker 訊息,並將其餘訊息收集到列表酬載中以產生。

以下列表顯示設定 FileAggregator 的可能方式

  • Java DSL

  • Kotlin DSL

  • Java

  • XML

@Bean
public IntegrationFlow fileSplitterAggregatorFlow(TaskExecutor taskExecutor) {
    return f -> f
            .split(Files.splitter()
                    .markers()
                    .firstLineAsHeader("firstLine"))
            .channel(c -> c.executor(taskExecutor))
            .filter(payload -> !(payload instanceof FileSplitter.FileMarker),
                    e -> e.discardChannel("aggregatorChannel"))
            .<String, String>transform(String::toUpperCase)
            .channel("aggregatorChannel")
            .aggregate(new FileAggregator())
            .channel(c -> c.queue("resultChannel"));
}
@Bean
fun fileSplitterAggregatorFlow(taskExecutor: TaskExecutor?) =
    integrationFlow {
        split(Files.splitter().markers().firstLineAsHeader("firstLine"))
        channel { executor(taskExecutor) }
        filter<Any>({ it !is FileMarker }) { discardChannel("aggregatorChannel") }
        transform(String::toUpperCase)
        channel("aggregatorChannel")
        aggregate(FileAggregator())
        channel { queue("resultChannel") }
    }
@serviceActivator(inputChannel="toAggregateFile")
@Bean
public AggregatorFactoryBean fileAggregator() {
    AggregatorFactoryBean aggregator = new AggregatorFactoryBean();
    aggregator.setProcessorBean(new FileAggregator());
    aggregator.setOutputChannel(outputChannel);
    return aggregator;
}
<int:chain input-channel="input" output-channel="output">
    <int-file:splitter markers="true"/>
    <int:aggregator>
        <bean class="org.springframework.integration.file.aggregator.FileAggregator"/>
    </int:aggregator>
</int:chain>

如果 FileAggregator 的預設行為不符合目標邏輯,建議使用個別策略設定彙集器端點。有關更多資訊,請參閱 FileAggregator JavaDocs。