訊息通道
除了具有 EIP 方法的 IntegrationFlowBuilder
之外,Java DSL 還提供流暢的 API 來設定 MessageChannel
實例。為此,提供了 MessageChannels
Builder 工廠。以下範例示範如何使用它
@Bean
public PriorityChannelSpec priorityChannel() {
return MessageChannels.priority(this.mongoDbChannelMessageStore, "priorityGroup")
.interceptor(wireTap());
}
相同的 MessageChannels
Builder 工廠可用於 IntegrationFlowBuilder
中的 channel()
EIP 方法,以連接端點,類似於 XML 設定中連接 input-channel
/output-channel
對。預設情況下,端點與 DirectChannel
實例連接,其中 Bean 名稱基於以下模式:[IntegrationFlow.beanName].channel#[channelNameIndex]
。此規則也適用於使用內聯 MessageChannels
Builder 工廠產生的未命名通道。但是,所有 MessageChannels
方法都有一個變體,可以識別 channelId
,您可以使用它來設定 MessageChannel
實例的 Bean 名稱。MessageChannel
參考和 beanName
可以用作 Bean 方法調用。以下範例示範了使用 channel()
EIP 方法的可能方式
@Bean
public QueueChannelSpec queueChannel() {
return MessageChannels.queue();
}
@Bean
public PublishSubscribeChannelSpec<?> publishSubscribe() {
return MessageChannels.publishSubscribe();
}
@Bean
public IntegrationFlow channelFlow() {
return IntegrationFlow.from("input")
.fixedSubscriberChannel()
.channel("queueChannel")
.channel(publishSubscribe())
.channel(MessageChannels.executor("executorChannel", this.taskExecutor))
.channel("output")
.get();
}
-
from("input")
表示「尋找並使用具有 "input" ID 的MessageChannel
,或建立一個」。 -
fixedSubscriberChannel()
產生FixedSubscriberChannel
的實例,並以channelFlow.channel#0
的名稱註冊它。 -
channel("queueChannel")
的工作方式相同,但使用現有的queueChannel
Bean。 -
channel(publishSubscribe())
是 Bean 方法參考。 -
channel(MessageChannels.executor("executorChannel", this.taskExecutor))
是IntegrationFlowBuilder
,它向ExecutorChannel
公開IntegrationComponentSpec
,並將其註冊為executorChannel
。 -
channel("output")
將DirectChannel
Bean 註冊為output
作為其名稱,只要不存在具有此名稱的 Bean。
注意:前面的 IntegrationFlow
定義是有效的,並且其所有通道都應用於具有 BridgeHandler
實例的端點。
請注意,從不同的 IntegrationFlow 實例中使用相同的內聯通道定義通過 MessageChannels 工廠。即使 DSL 解析器將不存在的物件註冊為 Bean,它也無法從不同的 IntegrationFlow 容器中確定相同的物件 (MessageChannel )。以下範例是錯誤的 |
@Bean
public IntegrationFlow startFlow() {
return IntegrationFlow.from("input")
.transform(...)
.channel(MessageChannels.queue("queueChannel"))
.get();
}
@Bean
public IntegrationFlow endFlow() {
return IntegrationFlow.from(MessageChannels.queue("queueChannel"))
.handle(...)
.get();
}
該錯誤範例的結果是以下例外
Caused by: java.lang.IllegalStateException:
Could not register object [queueChannel] under bean name 'queueChannel':
there is already object [queueChannel] bound
at o.s.b.f.s.DefaultSingletonBeanRegistry.registerSingleton(DefaultSingletonBeanRegistry.java:129)
為了使其運作,您需要為該通道宣告 @Bean
,並從不同的 IntegrationFlow
實例中使用其 Bean 方法。