Spring Integration 互動

Spring Integration Framework 延伸了 Spring 程式設計模型,以支援廣為人知的企業整合模式。它在基於 Spring 的應用程式中實現了輕量級訊息傳遞,並支援透過宣告式适配器與外部系統整合。它還提供了一個高階 DSL,將各種操作(端點)組合成邏輯整合流程。憑藉此 DSL 配置的 lambda 風格,Spring Integration 已經在很大程度上採用了 java.util.function 介面。 @MessagingGateway 代理介面也可以作為 FunctionConsumer,根據 Spring Cloud Function 環境,可以將其註冊到函式目錄中。有關其對函式的支援的更多資訊,請參閱 Spring Integration 參考手冊

另一方面,從 4.0.3 版本開始,Spring Cloud Function 引入了 spring-cloud-function-integration 模組,該模組為從 Spring Integration DSL 角度與 FunctionCatalog 互動提供了更深入、更雲端特定且基於自動配置的 API。 FunctionFlowBuilder 已自動配置並透過 FunctionCatalog 自動連線,並代表目標 IntegrationFlow 實例的函式特定 DSL 的進入點。除了標準的 IntegrationFlow.from() 工廠(為了方便起見),FunctionFlowBuilder 還公開了 fromSupplier(String supplierDefinition) 工廠,以在提供的 FunctionCatalog 中查找目標 Supplier。然後,此 FunctionFlowBuilder 會導向 FunctionFlowDefinition。此 FunctionFlowDefinitionIntegrationFlowExtension 的實作,並公開 apply(String functionDefinition)accept(String consumerDefinition) 運算子,以分別從 FunctionCatalog 中查找 FunctionConsumer。 有關更多資訊,請參閱其 Javadoc。

以下範例示範了 FunctionFlowBuilder 的實際應用,以及 IntegrationFlow API 其餘部分的強大功能

@Configuration
public class IntegrationConfiguration {

    @Bean
    Supplier<byte[]> simpleByteArraySupplier() {
        return "simple test data"::getBytes;
    }

    @Bean
    Function<String, String> upperCaseFunction() {
        return String::toUpperCase;
    }

    @Bean
    BlockingQueue<String> results() {
        return new LinkedBlockingQueue<>();
    }

    @Bean
    Consumer<String> simpleStringConsumer(BlockingQueue<String> results) {
        return results::add;
    }

    @Bean
    QueueChannel wireTapChannel() {
        return new QueueChannel();
    }

    @Bean
    IntegrationFlow someFunctionFlow(FunctionFlowBuilder functionFlowBuilder) {
        return functionFlowBuilder
                .fromSupplier("simpleByteArraySupplier")
                .wireTap("wireTapChannel")
                .apply("upperCaseFunction")
                .log(LoggingHandler.Level.WARN)
                .accept("simpleStringConsumer");
    }

}

由於 FunctionCatalog.lookup() 功能不僅限於簡單的函式名稱,因此函式組合功能也可以在上述 apply()accept() 運算子中使用

@Bean
IntegrationFlow functionCompositionFlow(FunctionFlowBuilder functionFlowBuilder) {
    return functionFlowBuilder
            .from("functionCompositionInput")
            .accept("upperCaseFunction|simpleStringConsumer");
}

當我們將預定義函式的自動配置依賴項新增到 Spring Cloud 應用程式中時,此 API 變得更加相關。例如,Stream Applications 專案除了應用程式映像檔外,還提供了用於各種整合用例的函式工件,例如 debezium-supplierelasticsearch-consumeraggregator-function 等。

以下配置分別基於 http-supplierspel-functionfile-consumer

@Bean
IntegrationFlow someFunctionFlow(FunctionFlowBuilder functionFlowBuilder) {
    return functionFlowBuilder
            .fromSupplier("httpSupplier", e -> e.poller(Pollers.trigger(new OnlyOnceTrigger())))
            .<Flux<?>>handle((fluxPayload, headers) -> fluxPayload, e -> e.async(true))
            .channel(c -> c.flux())
            .apply("spelFunction")
            .<String, String>transform(String::toUpperCase)
            .accept("fileConsumer");
}

我們還需要做的就是將它們的配置新增到 application.properties 中(如果需要)

http.path-pattern=/testPath
spel.function.expression=new String(payload)
file.consumer.name=test-data.txt