訊息通道

除了具有 EIP 方法的 IntegrationFlowBuilder 之外,Java DSL 還提供流暢的 API 來設定 MessageChannel 實例。為此,提供了 MessageChannels Builder 工廠。以下範例示範如何使用它

@Bean
public PriorityChannelSpec priorityChannel() {
    return MessageChannels.priority(this.mongoDbChannelMessageStore, "priorityGroup")
                        .interceptor(wireTap());
}

相同的 MessageChannels Builder 工廠可用於 IntegrationFlowBuilder 中的 channel() EIP 方法,以連接端點,類似於 XML 設定中連接 input-channel/output-channel 對。預設情況下,端點與 DirectChannel 實例連接,其中 Bean 名稱基於以下模式:[IntegrationFlow.beanName].channel#[channelNameIndex]。此規則也適用於使用內聯 MessageChannels Builder 工廠產生的未命名通道。但是,所有 MessageChannels 方法都有一個變體,可以識別 channelId,您可以使用它來設定 MessageChannel 實例的 Bean 名稱。MessageChannel 參考和 beanName 可以用作 Bean 方法調用。以下範例示範了使用 channel() EIP 方法的可能方式

@Bean
public QueueChannelSpec queueChannel() {
    return MessageChannels.queue();
}

@Bean
public PublishSubscribeChannelSpec<?> publishSubscribe() {
    return MessageChannels.publishSubscribe();
}

@Bean
public IntegrationFlow channelFlow() {
    return IntegrationFlow.from("input")
                .fixedSubscriberChannel()
                .channel("queueChannel")
                .channel(publishSubscribe())
                .channel(MessageChannels.executor("executorChannel", this.taskExecutor))
                .channel("output")
                .get();
}
  • from("input") 表示「尋找並使用具有 "input" ID 的 MessageChannel,或建立一個」。

  • fixedSubscriberChannel() 產生 FixedSubscriberChannel 的實例,並以 channelFlow.channel#0 的名稱註冊它。

  • channel("queueChannel") 的工作方式相同,但使用現有的 queueChannel Bean。

  • channel(publishSubscribe()) 是 Bean 方法參考。

  • channel(MessageChannels.executor("executorChannel", this.taskExecutor))IntegrationFlowBuilder,它向 ExecutorChannel 公開 IntegrationComponentSpec,並將其註冊為 executorChannel

  • channel("output")DirectChannel Bean 註冊為 output 作為其名稱,只要不存在具有此名稱的 Bean。

注意:前面的 IntegrationFlow 定義是有效的,並且其所有通道都應用於具有 BridgeHandler 實例的端點。

請注意,從不同的 IntegrationFlow 實例中使用相同的內聯通道定義通過 MessageChannels 工廠。即使 DSL 解析器將不存在的物件註冊為 Bean,它也無法從不同的 IntegrationFlow 容器中確定相同的物件 (MessageChannel)。以下範例是錯誤的
@Bean
public IntegrationFlow startFlow() {
    return IntegrationFlow.from("input")
                .transform(...)
                .channel(MessageChannels.queue("queueChannel"))
                .get();
}

@Bean
public IntegrationFlow endFlow() {
    return IntegrationFlow.from(MessageChannels.queue("queueChannel"))
                .handle(...)
                .get();
}

該錯誤範例的結果是以下例外

Caused by: java.lang.IllegalStateException:
Could not register object [queueChannel] under bean name 'queueChannel':
     there is already object [queueChannel] bound
	    at o.s.b.f.s.DefaultSingletonBeanRegistry.registerSingleton(DefaultSingletonBeanRegistry.java:129)

為了使其運作,您需要為該通道宣告 @Bean,並從不同的 IntegrationFlow 實例中使用其 Bean 方法。