註解支援

除了使用 XML 命名空間支援設定訊息端點之外,您也可以使用註解。首先,Spring Integration 提供了類別層級的 @MessageEndpoint 作為原型註解,表示它本身使用 Spring 的 @Component 註解進行註解,因此會自動被 Spring 的元件掃描識別為 bean 定義。

更重要的是各種方法層級的註解。它們表示帶註解的方法能夠處理訊息。以下範例示範了類別層級和方法層級的註解

@MessageEndpoint
public class FooService {

    @ServiceActivator
    public void processMessage(Message message) {
        ...
    }
}

方法「處理」訊息的確切含義取決於特定的註解。Spring Integration 中可用的註解包括

如果您將 XML 設定與註解結合使用,則不需要 @MessageEndpoint 註解。如果您想要從 <service-activator/> 元素的 ref 屬性設定 POJO 參考,則只能提供方法層級的註解。在這種情況下,即使 <service-activator/> 元素上不存在方法層級屬性,註解也可以防止歧義。

在大多數情況下,帶註解的處理器方法不應要求 Message 類型作為其參數。相反地,方法參數類型可以符合訊息的 payload 類型,如下列範例所示

public class ThingService {

    @ServiceActivator
    public void bar(Thing thing) {
        ...
    }

}

當方法參數應從 MessageHeaders 中的值映射時,另一個選項是使用參數層級的 @Header 註解。一般而言,使用 Spring Integration 註解註解的方法可以接受 Message 本身、訊息 payload 或標頭值 (使用 @Header) 作為參數。事實上,方法可以接受組合,如下列範例所示

public class ThingService {

    @ServiceActivator
    public void otherThing(String payload, @Header("x") int valueX, @Header("y") int valueY) {
        ...
    }

}

您也可以使用 @Headers 註解來提供所有訊息標頭作為 Map,如下列範例所示

public class ThingService {

    @ServiceActivator
    public void otherThing(String payload, @Headers Map<String, Object> headerMap) {
        ...
    }

}
註解的值也可以是 SpEL 表達式 (例如,someHeader.toUpperCase()),當您希望在注入標頭值之前操作它時,這非常有用。它還提供了一個可選的 required 屬性,用於指定屬性值是否必須在標頭中可用。required 屬性的預設值為 true

對於其中幾個註解,當訊息處理方法傳回非空值時,端點會嘗試傳送回覆。這在所有設定選項 (命名空間和註解) 中都是一致的,因為會使用此類端點的輸出通道 (如果可用),並且 REPLY_CHANNEL 訊息標頭值會用作後備。

端點上的輸出通道和回覆通道訊息標頭的組合啟用了管道方法,其中多個元件具有輸出通道,而最終元件允許將回覆訊息轉發到回覆通道 (如原始請求訊息中所指定)。換句話說,最終元件取決於原始傳送者提供的資訊,並且可以動態支援任意數量的用戶端。這是 回覆地址 模式的範例。

除了此處顯示的範例之外,這些註解也支援 inputChanneloutputChannel 屬性,如下列範例所示

@Service
public class ThingService {

    @ServiceActivator(inputChannel="input", outputChannel="output")
    public void otherThing(String payload, @Headers Map<String, Object> headerMap) {
        ...
    }

}

這些註解的處理會建立與對應 XML 元件相同的 bean — AbstractEndpoint 實例和 MessageHandler 實例 (或輸入通道配接器的 MessageSource 實例)。請參閱 @Bean 方法上的註解。bean 名稱是從以下模式產生的:[componentName].[methodName].[decapitalizedAnnotationClassShortName]。在先前的範例中,bean 名稱是 thingService.otherThing.serviceActivator,適用於 AbstractEndpoint,而名稱相同但帶有額外的 .handler (.source) 後綴,適用於 MessageHandler (MessageSource) bean。可以使用 @EndpointId 註解以及這些訊息傳遞註解來自訂此類名稱。MessageHandler 實例 (MessageSource 實例) 也有資格由 訊息歷史記錄 追蹤。

從 4.0 版開始,所有訊息傳遞註解都提供 SmartLifecycle 選項 (autoStartupphase),以允許在應用程式內容初始化時控制端點生命週期。它們預設分別為 true0。若要變更端點的狀態 (例如 start()stop()),您可以透過使用 BeanFactory (或自動裝配) 取得端點 bean 的參考,並調用方法。或者,您可以將命令訊息傳送到 Control Bus (請參閱 控制匯流排)。對於這些目的,您應該使用前面段落中提及的 beanName

在剖析提及的註解 (當未設定特定通道 bean 時) 後自動建立的通道以及對應的消費者端點,會在內容初始化接近尾聲時宣告為 bean。這些 bean 可以 在其他服務中自動裝配,但它們必須標記為 @Lazy 註解,因為定義通常在正常自動裝配處理期間還不可用。

@Autowired
@Lazy
@Qualifier("someChannel")
MessageChannel someChannel;
...

@Bean
Thing1 dependsOnSPCA(@Qualifier("someInboundAdapter") @Lazy SourcePollingChannelAdapter someInboundAdapter) {
    ...
}

從 6.0 版開始,所有訊息傳遞註解現在都是 @Repeatable,因此可以在同一個服務方法上宣告多個相同類型,其含義是建立與重複註解一樣多的端點

@Transformer(inputChannel = "inputChannel1", outputChannel = "outputChannel1")
@Transformer(inputChannel = "inputChannel2", outputChannel = "outputChannel2")
public String transform(String input) {
    return input.toUpperCase();
}

使用 @Poller 註解

在 Spring Integration 4.0 之前,訊息傳遞註解要求 inputChannel 是對 SubscribableChannel 的參考。對於 PollableChannel 實例,需要 <int:bridge/> 元素來設定 <int:poller/> 並使複合端點成為 PollingConsumer。4.0 版引入了 @Poller 註解,以允許直接在訊息傳遞註解上設定 poller 屬性,如下列範例所示

public class AnnotationService {

    @Transformer(inputChannel = "input", outputChannel = "output",
        poller = @Poller(maxMessagesPerPoll = "${poller.maxMessagesPerPoll}", fixedDelay = "${poller.fixedDelay}"))
    public String handle(String payload) {
        ...
    }
}

@Poller 註解僅提供簡單的 PollerMetadata 選項。您可以使用屬性預留位置設定 @Poller 註解的屬性 (maxMessagesPerPollfixedDelayfixedRatecron)。此外,從 5.1 版開始,也提供了 PollingConsumerreceiveTimeout 選項。如果需要提供更多輪詢選項 (例如,transactionadvice-chainerror-handler 和其他選項),您應該將 PollerMetadata 設定為通用 bean,並將其 bean 名稱用作 @Pollervalue 屬性。在這種情況下,不允許使用其他屬性 (它們必須在 PollerMetadata bean 上指定)。請注意,如果 inputChannelPollableChannel 且未設定 @Poller,則會使用預設的 PollerMetadata (如果它存在於應用程式內容中)。若要使用 @Configuration 註解宣告預設 poller,請使用類似於以下範例的程式碼

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
    PollerMetadata pollerMetadata = new PollerMetadata();
    pollerMetadata.setTrigger(new PeriodicTrigger(10));
    return pollerMetadata;
}

以下範例示範如何使用預設 poller

public class AnnotationService {

    @Transformer(inputChannel = "aPollableChannel", outputChannel = "output")
    public String handle(String payload) {
        ...
    }
}

以下範例示範如何使用具名 poller

@Bean
public PollerMetadata myPoller() {
    PollerMetadata pollerMetadata = new PollerMetadata();
    pollerMetadata.setTrigger(new PeriodicTrigger(1000));
    return pollerMetadata;
}

以下範例示範使用預設 poller 的端點

public class AnnotationService {

    @Transformer(inputChannel = "aPollableChannel", outputChannel = "output"
                           poller = @Poller("myPoller"))
    public String handle(String payload) {
         ...
    }
}

從 4.3.3 版開始,@Poller 註解具有 errorChannel 屬性,以便更輕鬆地設定基礎 MessagePublishingErrorHandler。此屬性扮演與 <poller> XML 元件中的 error-channel 相同的角色。請參閱 端點命名空間支援 以取得更多資訊。

訊息傳遞註解上的 poller() 屬性與 reactive() 屬性互斥。請參閱下一節以取得更多資訊。

使用 @Reactive 註解

ReactiveStreamsConsumer 自 5.0 版以來就已存在,但僅當端點的輸入通道是 FluxMessageChannel (或任何 org.reactivestreams.Publisher 實作) 時才應用它。從 5.3 版開始,當目標訊息處理器是 ReactiveMessageHandler 時,即使輸入通道類型與訊息處理器無關,框架也會建立其執行個體。@Reactive 子註解 (類似於上面提及的 @Poller) 已針對從 5.5 版開始的所有訊息傳遞註解引入。它接受可選的 Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>> bean 參考,並且與輸入通道類型和訊息處理器無關,將目標端點變成 ReactiveStreamsConsumer 實例。函數從 Flux.transform() 運算子使用,以對來自輸入通道的反應式串流來源套用一些自訂 (publishOn()doOnNext()log()retry() 等)。

以下範例示範如何從輸入通道獨立於最終訂閱者和生產者變更發布執行緒到 DirectChannel

@Bean
public Function<Flux<?>, Flux<?>> publishOnCustomizer() {
    return flux -> flux.publishOn(Schedulers.parallel());
}

@ServiceActivator(inputChannel = "directChannel", reactive = @Reactive("publishOnCustomizer"))
public void handleReactive(String payload) {
    ...
}

訊息傳遞註解上的 reactive() 屬性與 poller() 屬性互斥。請參閱 使用 @Poller 註解反應式串流支援 以取得更多資訊。

使用 @InboundChannelAdapter 註解

4.0 版引入了 @InboundChannelAdapter 方法層級註解。它根據帶註解方法的 MethodInvokingMessageSource 產生 SourcePollingChannelAdapter 整合元件。此註解是 <int:inbound-channel-adapter> XML 元件的類比,並且具有相同的限制:方法不能有參數,且傳回類型不得為 void。它有兩個屬性:value (必要的 MessageChannel bean 名稱) 和 poller (可選的 @Poller 註解,如 先前所述)。如果您需要提供一些 MessageHeaders,請使用 Message<?> 傳回類型,並使用 MessageBuilder 來建置 Message<?>。使用 MessageBuilder 可讓您設定 MessageHeaders。以下範例示範如何使用 @InboundChannelAdapter 註解

@InboundChannelAdapter("counterChannel")
public Integer count() {
    return this.counter.incrementAndGet();
}

@InboundChannelAdapter(value = "fooChannel", poller = @Poller(fixed-rate = "5000"))
public String foo() {
    return "foo";
}

4.3 版為 value 註解屬性引入了 channel 別名,以提供更好的原始碼可讀性。此外,目標 MessageChannel bean 在 SourcePollingChannelAdapter 中由提供的名稱 (由 outputChannelName 選項設定) 在第一個 receive() 呼叫時解析,而不是在初始化階段解析。它允許「延遲繫結」邏輯:從消費者角度來看,目標 MessageChannel bean 的建立和註冊時間比 @InboundChannelAdapter 剖析階段稍晚。

第一個範例要求預設 poller 已在應用程式內容中的其他位置宣告。

使用 @MessagingGateway 註解

使用 @IntegrationComponentScan 註解

標準 Spring Framework @ComponentScan 註解不會掃描介面以尋找原型 @Component 註解。為了克服此限制並允許設定 @MessagingGateway (請參閱 @MessagingGateway 註解),我們引入了 @IntegrationComponentScan 機制。此註解必須與 @Configuration 註解一起放置,並自訂以定義其掃描選項,例如 basePackagesbasePackageClasses。在這種情況下,所有探索到的帶有 @MessagingGateway 註解的介面都會被剖析並註冊為 GatewayProxyFactoryBean 實例。所有其他基於類別的元件都由標準 @ComponentScan 剖析。