動態與執行階段整合流程

IntegrationFlow 及其所有相依元件可以在執行階段註冊。在 5.0 版本之前,我們使用 BeanFactory.registerSingleton() hook。從 Spring Framework 5.0 開始,我們使用 instanceSupplier hook 進行程式化的 BeanDefinition 註冊。以下範例顯示如何以程式方式註冊 bean

BeanDefinition beanDefinition =
         BeanDefinitionBuilder.genericBeanDefinition((Class<Object>) bean.getClass(), () -> bean)
               .getRawBeanDefinition();

((BeanDefinitionRegistry) this.beanFactory).registerBeanDefinition(beanName, beanDefinition);

請注意,在前面的範例中,instanceSupplier hook 是 genericBeanDefinition 方法的最後一個參數,在本例中由 lambda 提供。

所有必要的 bean 初始化和生命週期都會自動完成,就像標準上下文設定 bean 定義一樣。

為了簡化開發體驗,Spring Integration 引入了 IntegrationFlowContext,以便在執行階段註冊和管理 IntegrationFlow 實例,如下列範例所示

@Autowired
private AbstractServerConnectionFactory server1;

@Autowired
private IntegrationFlowContext flowContext;

...

@Test
public void testTcpGateways() {
    TestingUtilities.waitListening(this.server1, null);

    IntegrationFlow flow = f -> f
            .handle(Tcp.outboundGateway(Tcp.netClient("localhost", this.server1.getPort())
                    .serializer(TcpCodecs.crlf())
                    .deserializer(TcpCodecs.lengthHeader1())
                    .id("client1"))
                .remoteTimeout(m -> 5000))
            .transform(Transformers.objectToString());

    IntegrationFlowRegistration theFlow = this.flowContext.registration(flow).register();
    assertThat(theFlow.getMessagingTemplate().convertSendAndReceive("foo", String.class), equalTo("FOO"));
}

當我們有多個設定選項,並且必須建立多個相似流程的實例時,這非常有用。為此,我們可以迭代我們的選項,並在迴圈中建立和註冊 IntegrationFlow 實例。另一種變體是當我們的資料來源不是基於 Spring 時,因此我們必須即時建立它。以下範例顯示了 Reactive Streams 事件來源的範例

Flux<Message<?>> messageFlux =
    Flux.just("1,2,3,4")
        .map(v -> v.split(","))
        .flatMapIterable(Arrays::asList)
        .map(Integer::parseInt)
        .map(GenericMessage<Integer>::new);

QueueChannel resultChannel = new QueueChannel();

IntegrationFlow integrationFlow =
    IntegrationFlow.from(messageFlux)
        .<Integer, Integer>transform(p -> p * 2)
        .channel(resultChannel)
        .get();

this.integrationFlowContext.registration(integrationFlow)
            .register();

IntegrationFlowRegistrationBuilder(作為 IntegrationFlowContext.registration() 的結果)可用於為要註冊的 IntegrationFlow 指定 bean 名稱,控制其 autoStartup,以及註冊非 Spring Integration bean。通常,這些額外的 bean 是連線工廠(AMQP、JMS、(S)FTP、TCP/UDP 等)、序列化器和還原序列化器,或任何其他需要的支援元件。

當您不再需要動態註冊的 IntegrationFlow 及其所有相依 bean 時,可以使用 IntegrationFlowRegistration.destroy() 回呼來移除它們。請參閱 IntegrationFlowContext Javadoc 以取得更多資訊。

從 5.0.6 版開始,IntegrationFlow 定義中所有產生的 bean 名稱都以流程 ID 作為前綴。我們建議始終指定明確的流程 ID。否則,將在 IntegrationFlowContext 中啟動同步屏障,以產生 IntegrationFlow 的 bean 名稱並註冊其 bean。我們在這兩個操作上同步,以避免當相同的產生 bean 名稱可能用於不同的 IntegrationFlow 實例時發生競爭條件。

此外,從 5.0.6 版開始,註冊建構器 API 有一個新方法:useFlowIdAsPrefix()。如果您希望宣告相同流程的多個實例,並避免流程中元件具有相同 ID 時發生 bean 名稱衝突,這非常有用,如下列範例所示

private void registerFlows() {
    IntegrationFlowRegistration flow1 =
              this.flowContext.registration(buildFlow(1234))
                    .id("tcp1")
                    .useFlowIdAsPrefix()
                    .register();

    IntegrationFlowRegistration flow2 =
              this.flowContext.registration(buildFlow(1235))
                    .id("tcp2")
                    .useFlowIdAsPrefix()
                    .register();
}

private IntegrationFlow buildFlow(int port) {
    return f -> f
            .handle(Tcp.outboundGateway(Tcp.netClient("localhost", port)
                    .serializer(TcpCodecs.crlf())
                    .deserializer(TcpCodecs.lengthHeader1())
                    .id("client"))
                .remoteTimeout(m -> 5000))
            .transform(Transformers.objectToString());
}

在這種情況下,第一個流程的訊息處理器可以使用 tcp1.client.handler 的 bean 名稱來參考。

當您使用 useFlowIdAsPrefix() 時,需要 id 屬性。