子流程支援

if…​elsepublish-subscribe 元件中的某些元件提供使用子流程指定其邏輯或對應的功能。最簡單的範例是 .publishSubscribeChannel(),如下列範例所示

@Bean
public IntegrationFlow subscribersFlow() {
    return flow -> flow
            .publishSubscribeChannel(Executors.newCachedThreadPool(), s -> s
                    .subscribe(f -> f
                            .<Integer>handle((p, h) -> p / 2)
                            .channel(c -> c.queue("subscriber1Results")))
                    .subscribe(f -> f
                            .<Integer>handle((p, h) -> p * 2)
                            .channel(c -> c.queue("subscriber2Results"))))
            .<Integer>handle((p, h) -> p * 3)
            .channel(c -> c.queue("subscriber3Results"));
}

您可以使用個別的 IntegrationFlow @Bean 定義來達成相同的結果,但我們希望您會發現子流程樣式的邏輯組合很有用。我們發現這樣可以產生更簡短(因此更易於閱讀)的程式碼。

從 5.3 版開始,提供基於 BroadcastCapableChannelpublishSubscribeChannel() 實作,以在經紀商支援的訊息通道上設定子流程訂閱者。例如,我們現在可以在 Jms.publishSubscribeChannel() 上將多個訂閱者設定為子流程

@Bean
public JmsPublishSubscribeMessageChannelSpec jmsPublishSubscribeChannel() {
    return Jms.publishSubscribeChannel(jmsConnectionFactory())
                .destination("pubsub");
}

@Bean
public IntegrationFlow pubSubFlow(BroadcastCapableChannel jmsPublishSubscribeChannel) {
    return f -> f
            .publishSubscribeChannel(jmsPublishSubscribeChannel,
                    pubsub -> pubsub
                            .subscribe(subFlow -> subFlow
                                .channel(c -> c.queue("jmsPubSubBridgeChannel1")))
                            .subscribe(subFlow -> subFlow
                                .channel(c -> c.queue("jmsPubSubBridgeChannel2"))));
}

類似的 publish-subscribe 子流程組合提供了 .routeToRecipients() 方法。

另一個範例是在 .filter() 方法上使用 .discardFlow() 而不是 .discardChannel()

.route() 值得特別關注。請考慮下列範例

@Bean
public IntegrationFlow routeFlow() {
    return f -> f
            .<Integer, Boolean>route(p -> p % 2 == 0,
                    m -> m.channelMapping("true", "evenChannel")
                            .subFlowMapping("false", sf ->
                                    sf.<Integer>handle((p, h) -> p * 3)))
            .transform(Object::toString)
            .channel(c -> c.queue("oddChannel"));
}

.channelMapping() 繼續像在常規 Router 對應中一樣運作,但 .subFlowMapping() 將該子流程繫結到主流程。換句話說,任何路由器的子流程都會在 .route() 之後返回主流程。

有時,您需要從 .subFlowMapping() 參考現有的 IntegrationFlow @Bean。下列範例示範如何執行此操作

@Bean
public IntegrationFlow splitRouteAggregate() {
    return f -> f
            .split()
            .<Integer, Boolean>route(o -> o % 2 == 0,
                    m -> m
                            .subFlowMapping(true, oddFlow())
                            .subFlowMapping(false, sf -> sf.gateway(evenFlow())))
            .aggregate();
}

@Bean
public IntegrationFlow oddFlow() {
    return f -> f.handle(m -> System.out.println("odd"));
}

@Bean
public IntegrationFlow evenFlow() {
    return f -> f.handle((p, h) -> "even");
}


在這種情況下,當您需要從此類子流程接收回覆並繼續主流程時,此 IntegrationFlow bean 參考(或其輸入通道)必須使用 .gateway() 包裝,如前一個範例所示。前一個範例中的 oddFlow() 參考未包裝到 .gateway()。因此,我們不期望從此路由分支收到回覆。否則,您最終會得到類似於以下的例外

Caused by: org.springframework.beans.factory.BeanCreationException:
    The 'currentComponent' (org.springframework.integration.router.MethodInvokingRouter@7965a51c)
    is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'.
    This is the end of the integration flow.

當您將子流程設定為 lambda 時,框架會處理與子流程的請求-回覆互動,而不需要閘道。

子流程可以巢狀到任何深度,但我們不建議這樣做。實際上,即使在路由器案例中,在流程中新增複雜的子流程也會很快開始看起來像一盤義大利麵,並且難以讓人解析。

在 DSL 支援子流程設定的情況下,當通常需要通道來設定元件,並且該子流程以 channel() 元素開頭時,框架會隱式地在元件輸出通道和流程輸入通道之間放置一個 bridge()。例如,在此 filter 定義中

.filter(p -> p instanceof String, e -> e
	.discardFlow(df -> df
                         .channel(MessageChannels.queue())
                         ...)

框架會在內部建立 DirectChannel bean 以注入到 MessageFilter.discardChannel 中。然後,它將子流程包裝到以這個隱式通道開始訂閱的 IntegrationFlow 中,並在流程中指定的 channel() 之前放置一個 bridge。當現有的 IntegrationFlow bean 用作子流程參考(而不是內聯子流程,例如 lambda)時,不需要這樣的橋接器,因為框架可以從流程 bean 解析第一個通道。使用內聯子流程時,輸入通道尚不可用。