Spring Integration 互動
Spring Integration Framework 延伸了 Spring 程式設計模型,以支援廣為人知的企業整合模式。它在基於 Spring 的應用程式中實現了輕量級訊息傳遞,並支援透過宣告式适配器與外部系統整合。它還提供了一個高階 DSL,將各種操作(端點)組合成邏輯整合流程。憑藉此 DSL 配置的 lambda 風格,Spring Integration 已經在很大程度上採用了 java.util.function
介面。 @MessagingGateway
代理介面也可以作為 Function
或 Consumer
,根據 Spring Cloud Function 環境,可以將其註冊到函式目錄中。有關其對函式的支援的更多資訊,請參閱 Spring Integration 參考手冊。
另一方面,從 4.0.3
版本開始,Spring Cloud Function 引入了 spring-cloud-function-integration
模組,該模組為從 Spring Integration DSL 角度與 FunctionCatalog
互動提供了更深入、更雲端特定且基於自動配置的 API。 FunctionFlowBuilder
已自動配置並透過 FunctionCatalog
自動連線,並代表目標 IntegrationFlow
實例的函式特定 DSL 的進入點。除了標準的 IntegrationFlow.from()
工廠(為了方便起見),FunctionFlowBuilder
還公開了 fromSupplier(String supplierDefinition)
工廠,以在提供的 FunctionCatalog
中查找目標 Supplier
。然後,此 FunctionFlowBuilder
會導向 FunctionFlowDefinition
。此 FunctionFlowDefinition
是 IntegrationFlowExtension
的實作,並公開 apply(String functionDefinition)
和 accept(String consumerDefinition)
運算子,以分別從 FunctionCatalog
中查找 Function
或 Consumer
。 有關更多資訊,請參閱其 Javadoc。
以下範例示範了 FunctionFlowBuilder
的實際應用,以及 IntegrationFlow
API 其餘部分的強大功能
@Configuration
public class IntegrationConfiguration {
@Bean
Supplier<byte[]> simpleByteArraySupplier() {
return "simple test data"::getBytes;
}
@Bean
Function<String, String> upperCaseFunction() {
return String::toUpperCase;
}
@Bean
BlockingQueue<String> results() {
return new LinkedBlockingQueue<>();
}
@Bean
Consumer<String> simpleStringConsumer(BlockingQueue<String> results) {
return results::add;
}
@Bean
QueueChannel wireTapChannel() {
return new QueueChannel();
}
@Bean
IntegrationFlow someFunctionFlow(FunctionFlowBuilder functionFlowBuilder) {
return functionFlowBuilder
.fromSupplier("simpleByteArraySupplier")
.wireTap("wireTapChannel")
.apply("upperCaseFunction")
.log(LoggingHandler.Level.WARN)
.accept("simpleStringConsumer");
}
}
由於 FunctionCatalog.lookup()
功能不僅限於簡單的函式名稱,因此函式組合功能也可以在上述 apply()
和 accept()
運算子中使用
@Bean
IntegrationFlow functionCompositionFlow(FunctionFlowBuilder functionFlowBuilder) {
return functionFlowBuilder
.from("functionCompositionInput")
.accept("upperCaseFunction|simpleStringConsumer");
}
當我們將預定義函式的自動配置依賴項新增到 Spring Cloud 應用程式中時,此 API 變得更加相關。例如,Stream Applications 專案除了應用程式映像檔外,還提供了用於各種整合用例的函式工件,例如 debezium-supplier
、elasticsearch-consumer
、aggregator-function
等。
以下配置分別基於 http-supplier
、spel-function
和 file-consumer
@Bean
IntegrationFlow someFunctionFlow(FunctionFlowBuilder functionFlowBuilder) {
return functionFlowBuilder
.fromSupplier("httpSupplier", e -> e.poller(Pollers.trigger(new OnlyOnceTrigger())))
.<Flux<?>>handle((fluxPayload, headers) -> fluxPayload, e -> e.async(true))
.channel(c -> c.flux())
.apply("spelFunction")
.<String, String>transform(String::toUpperCase)
.accept("fileConsumer");
}
我們還需要做的就是將它們的配置新增到 application.properties
中(如果需要)
http.path-pattern=/testPath
spel.function.expression=new String(payload)
file.consumer.name=test-data.txt