動態與執行階段整合流程
IntegrationFlow
及其所有相依元件可以在執行階段註冊。在 5.0 版本之前,我們使用 BeanFactory.registerSingleton()
hook。從 Spring Framework 5.0
開始,我們使用 instanceSupplier
hook 進行程式化的 BeanDefinition
註冊。以下範例顯示如何以程式方式註冊 bean
BeanDefinition beanDefinition =
BeanDefinitionBuilder.genericBeanDefinition((Class<Object>) bean.getClass(), () -> bean)
.getRawBeanDefinition();
((BeanDefinitionRegistry) this.beanFactory).registerBeanDefinition(beanName, beanDefinition);
請注意,在前面的範例中,instanceSupplier
hook 是 genericBeanDefinition
方法的最後一個參數,在本例中由 lambda 提供。
所有必要的 bean 初始化和生命週期都會自動完成,就像標準上下文設定 bean 定義一樣。
為了簡化開發體驗,Spring Integration 引入了 IntegrationFlowContext
,以便在執行階段註冊和管理 IntegrationFlow
實例,如下列範例所示
@Autowired
private AbstractServerConnectionFactory server1;
@Autowired
private IntegrationFlowContext flowContext;
...
@Test
public void testTcpGateways() {
TestingUtilities.waitListening(this.server1, null);
IntegrationFlow flow = f -> f
.handle(Tcp.outboundGateway(Tcp.netClient("localhost", this.server1.getPort())
.serializer(TcpCodecs.crlf())
.deserializer(TcpCodecs.lengthHeader1())
.id("client1"))
.remoteTimeout(m -> 5000))
.transform(Transformers.objectToString());
IntegrationFlowRegistration theFlow = this.flowContext.registration(flow).register();
assertThat(theFlow.getMessagingTemplate().convertSendAndReceive("foo", String.class), equalTo("FOO"));
}
當我們有多個設定選項,並且必須建立多個相似流程的實例時,這非常有用。為此,我們可以迭代我們的選項,並在迴圈中建立和註冊 IntegrationFlow
實例。另一種變體是當我們的資料來源不是基於 Spring 時,因此我們必須即時建立它。以下範例顯示了 Reactive Streams 事件來源的範例
Flux<Message<?>> messageFlux =
Flux.just("1,2,3,4")
.map(v -> v.split(","))
.flatMapIterable(Arrays::asList)
.map(Integer::parseInt)
.map(GenericMessage<Integer>::new);
QueueChannel resultChannel = new QueueChannel();
IntegrationFlow integrationFlow =
IntegrationFlow.from(messageFlux)
.<Integer, Integer>transform(p -> p * 2)
.channel(resultChannel)
.get();
this.integrationFlowContext.registration(integrationFlow)
.register();
IntegrationFlowRegistrationBuilder
(作為 IntegrationFlowContext.registration()
的結果)可用於為要註冊的 IntegrationFlow
指定 bean 名稱,控制其 autoStartup
,以及註冊非 Spring Integration bean。通常,這些額外的 bean 是連線工廠(AMQP、JMS、(S)FTP、TCP/UDP 等)、序列化器和還原序列化器,或任何其他需要的支援元件。
當您不再需要動態註冊的 IntegrationFlow
及其所有相依 bean 時,可以使用 IntegrationFlowRegistration.destroy()
回呼來移除它們。請參閱 IntegrationFlowContext
Javadoc 以取得更多資訊。
從 5.0.6 版開始,IntegrationFlow 定義中所有產生的 bean 名稱都以流程 ID 作為前綴。我們建議始終指定明確的流程 ID。否則,將在 IntegrationFlowContext 中啟動同步屏障,以產生 IntegrationFlow 的 bean 名稱並註冊其 bean。我們在這兩個操作上同步,以避免當相同的產生 bean 名稱可能用於不同的 IntegrationFlow 實例時發生競爭條件。 |
此外,從 5.0.6 版開始,註冊建構器 API 有一個新方法:useFlowIdAsPrefix()
。如果您希望宣告相同流程的多個實例,並避免流程中元件具有相同 ID 時發生 bean 名稱衝突,這非常有用,如下列範例所示
private void registerFlows() {
IntegrationFlowRegistration flow1 =
this.flowContext.registration(buildFlow(1234))
.id("tcp1")
.useFlowIdAsPrefix()
.register();
IntegrationFlowRegistration flow2 =
this.flowContext.registration(buildFlow(1235))
.id("tcp2")
.useFlowIdAsPrefix()
.register();
}
private IntegrationFlow buildFlow(int port) {
return f -> f
.handle(Tcp.outboundGateway(Tcp.netClient("localhost", port)
.serializer(TcpCodecs.crlf())
.deserializer(TcpCodecs.lengthHeader1())
.id("client"))
.remoteTimeout(m -> 5000))
.transform(Transformers.objectToString());
}
在這種情況下,第一個流程的訊息處理器可以使用 tcp1.client.handler
的 bean 名稱來參考。
當您使用 useFlowIdAsPrefix() 時,需要 id 屬性。 |