訊息閘道

閘道隱藏了 Spring Integration 提供的訊息傳遞 API。它讓您的應用程式業務邏輯無需感知 Spring Integration API。透過使用通用閘道,您的程式碼僅與簡單的介面互動。

進入 GatewayProxyFactoryBean

如先前所述,最好不要依賴 Spring Integration API,包括閘道類別。因此,Spring Integration 提供了 GatewayProxyFactoryBean,它為任何介面產生代理,並在內部調用如下所示的閘道方法。透過使用依賴注入,您可以將介面公開給您的業務方法。

以下範例顯示可用於與 Spring Integration 互動的介面

public interface Cafe {

    void placeOrder(Order order);

}

閘道 XML 命名空間支援

也提供了命名空間支援。它讓您可以將介面設定為服務,如下列範例所示

<int:gateway id="cafeService"
         service-interface="org.cafeteria.Cafe"
         default-request-channel="requestChannel"
         default-reply-timeout="10000"
         default-reply-channel="replyChannel"/>

定義此組態後,現在可以將 cafeService 注入到其他 bean 中,且調用 Cafe 介面的代理實例上的方法之程式碼,無需感知 Spring Integration API。請參閱「範例」附錄,以取得使用 gateway 元素的範例(在 Cafe 示範中)。

前述組態中的預設值會套用至閘道介面上的所有方法。如果未指定回覆逾時,則呼叫執行緒會等待回覆 30 秒。請參閱沒有收到回覆時的閘道行為

預設值可以針對個別方法覆寫。請參閱使用註解和 XML 進行閘道組態

設定預設回覆通道

通常,您不需要指定 default-reply-channel,因為閘道會自動建立暫時的匿名回覆通道,在其中監聽回覆。但是,在某些情況下,可能會提示您定義 default-reply-channel(或搭配配接器閘道 (例如 HTTP、JMS 和其他) 的 reply-channel)。

為了提供一些背景資訊,我們簡要討論閘道的一些內部運作方式。閘道會建立暫時的點對點回覆通道。它是匿名的,並以名稱 replyChannel 新增至訊息標頭。當提供明確的 default-reply-channel(遠端配接器閘道的 reply-channel)時,您可以指向發佈-訂閱通道,之所以如此命名,是因為您可以為其新增多個訂閱者。在內部,Spring Integration 會在暫時的 replyChannel 和明確定義的 default-reply-channel 之間建立橋接器。

假設您希望您的回覆不僅發送到閘道,還發送到其他消費者。在這種情況下,您需要兩件事

  • 您可以訂閱的具名通道

  • 該通道是發佈-訂閱通道

閘道使用的預設策略無法滿足這些需求,因為新增至標頭的回覆通道是匿名的且為點對點。這表示沒有其他訂閱者可以取得其控制代碼,即使可以,通道也具有點對點行為,因此只有一個訂閱者會收到訊息。透過定義 default-reply-channel,您可以指向您選擇的通道。在這種情況下,它是 publish-subscribe-channel。閘道會從其建立到儲存在標頭中的暫時匿名回覆通道的橋接器。

您可能也想要明確提供回覆通道,以透過攔截器進行監控或稽核(例如,wiretap)。若要設定通道攔截器,您需要具名通道。

從 5.4 版開始,當閘道方法傳回類型為 void 時,如果未明確提供 replyChannel 標頭,則框架會將 replyChannel 標頭填入為 nullChannel bean 參考。這允許捨棄來自下游流程的任何可能回覆,以滿足單向閘道合約。

使用註解和 XML 進行閘道組態

考量以下範例,其透過新增 @Gateway 註解來擴充先前的 Cafe 介面範例

public interface Cafe {

    @Gateway(requestChannel="orders")
    void placeOrder(Order order);

}

@Header 註解可讓您新增解譯為訊息標頭的值,如下列範例所示

public interface FileWriter {

    @Gateway(requestChannel="filesOut")
    void write(byte[] content, @Header(FileHeaders.FILENAME) String filename);

}

如果您偏好使用 XML 方法來設定閘道方法,您可以將 method 元素新增至閘道組態,如下列範例所示

<int:gateway id="myGateway" service-interface="org.foo.bar.TestGateway"
      default-request-channel="inputC">
  <int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
  <int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
  <int:method name="echoUpperCase" request-channel="inputB"/>
  <int:method name="echoViaDefault"/>
</int:gateway>

您也可以使用 XML 為每個方法調用提供個別標頭。如果您想要設定的標頭本質上是靜態的,且您不想使用 @Header 註解將其嵌入閘道的方法簽章中,這會很有用。例如,在貸款仲介範例中,我們想要根據啟動的要求類型(單一報價或所有報價)來影響貸款報價的彙集方式。雖然可以透過評估調用哪個閘道方法來判斷要求類型,但這會違反關注點分離範例(方法是 Java 構件)。但是,在訊息傳遞架構中,在訊息標頭中表達您的意圖(中繼資訊)是很自然的。以下範例顯示如何為兩個方法中的每一個新增不同的訊息標頭

<int:gateway id="loanBrokerGateway"
         service-interface="org.springframework.integration.loanbroker.LoanBrokerGateway">
  <int:method name="getLoanQuote" request-channel="loanBrokerPreProcessingChannel">
    <int:header name="RESPONSE_TYPE" value="BEST"/>
  </int:method>
  <int:method name="getAllLoanQuotes" request-channel="loanBrokerPreProcessingChannel">
    <int:header name="RESPONSE_TYPE" value="ALL"/>
  </int:method>
</int:gateway>

在先前的範例中,會根據閘道的方法,為 'RESPONSE_TYPE' 標頭設定不同的值。

例如,如果您在 <int:method/> 以及 @Gateway 註解中指定 requestChannel,則註解值優先。
如果在 XML 中指定無引數閘道,且介面方法同時具有 @Payload@Gateway 註解(在 <int:method/> 元素中使用 payloadExpressionpayload-expression),則會忽略 @Payload 值。

運算式和「全域」標頭

<header/> 元素支援 expression 作為 value 的替代方案。會評估 SpEL 運算式以判斷標頭的值。從 5.2 版開始,評估內容的 #root 物件是具有 getMethod()getArgs() 存取器的 MethodArgsHolder。例如,如果您想要依簡單方法名稱進行路由,您可以新增具有下列運算式的標頭:method.name

java.reflect.Method 不可序列化。如果您稍後序列化訊息,則具有 method 運算式的標頭會遺失。因此,在這些情況下,您可能希望使用 method.namemethod.toString()toString() 方法提供方法的 String 表示法,包括參數和傳回類型。

從 3.0 版開始,可以定義 <default-header/> 元素,以將標頭新增至閘道產生的所有訊息,無論調用的方法為何。為方法定義的特定標頭優先於預設標頭。在此為方法定義的特定標頭會覆寫服務介面中的任何 @Header 註解。但是,預設標頭不會覆寫服務介面中的任何 @Header 註解。

閘道現在也支援 default-payload-expression,其適用於所有方法(除非覆寫)。

將方法引數對應到訊息

前一節中的組態技術可讓您控制方法引數如何對應到訊息元素(payload 和標頭)。當未使用明確組態時,會使用某些慣例來執行對應。在某些情況下,這些慣例無法判斷哪個引數是 payload,以及哪個引數應對應到標頭。考量以下範例

public String send1(Object thing1, Map thing2);

public String send2(Map thing1, Map thing2);

在第一種情況下,慣例是將第一個引數對應到 payload(只要它不是 Map),而第二個引數的內容會變成標頭。

在第二種情況下(或當參數 thing1 的引數為 Map 時的第一種情況下),框架無法判斷哪個引數應為 payload。因此,對應失敗。這通常可以使用 payload-expression@Payload 註解或 @Headers 註解來解決。

或者(以及每當慣例崩潰時),您可以承擔將方法調用對應到訊息的全部責任。若要執行此動作,請實作 MethodArgsMessageMapper,並使用 mapper 屬性將其提供給 <gateway/>。mapper 會對應 MethodArgsHolder,這是一個簡單的類別,用於包裝 java.reflect.Method 實例和包含引數的 Object[]。當提供自訂 mapper 時,不允許在閘道上使用 default-payload-expression 屬性和 <default-header/> 元素。同樣地,不允許在任何 <method/> 元素上使用 payload-expression 屬性和 <header/> 元素。

對應方法引數

以下範例顯示方法引數如何對應到訊息,並顯示一些組態無效的範例

public interface MyGateway {

    void payloadAndHeaderMapWithoutAnnotations(String s, Map<String, Object> map);

    void payloadAndHeaderMapWithAnnotations(@Payload String s, @Headers Map<String, Object> map);

    void headerValuesAndPayloadWithAnnotations(@Header("k1") String x, @Payload String s, @Header("k2") String y);

    void mapOnly(Map<String, Object> map); // the payload is the map and no custom headers are added

    void twoMapsAndOneAnnotatedWithPayload(@Payload Map<String, Object> payload, Map<String, Object> headers);

    @Payload("args[0] + args[1] + '!'")
    void payloadAnnotationAtMethodLevel(String a, String b);

    @Payload("@someBean.exclaim(args[0])")
    void payloadAnnotationAtMethodLevelUsingBeanResolver(String s);

    void payloadAnnotationWithExpression(@Payload("toUpperCase()") String s);

    void payloadAnnotationWithExpressionUsingBeanResolver(@Payload("@someBean.sum(#this)") String s); //  (1)

    // invalid
    void twoMapsWithoutAnnotations(Map<String, Object> m1, Map<String, Object> m2);

    // invalid
    void twoPayloads(@Payload String s1, @Payload String s2);

    // invalid
    void payloadAndHeaderAnnotationsOnSameParameter(@Payload @Header("x") String s);

    // invalid
    void payloadAndHeadersAnnotationsOnSameParameter(@Payload @Headers Map<String, Object> map);

}
1 請注意,在此範例中,SpEL 變數 #this 是指引數,在此案例中為 s 的值。

XML 等效項目看起來有點不同,因為方法引數沒有 #this 內容。但是,運算式可以使用 MethodArgsHolder 根物件的 args 屬性來參考方法引數(如需更多資訊,請參閱運算式和「全域」標頭),如下列範例所示

<int:gateway id="myGateway" service-interface="org.something.MyGateway">
  <int:method name="send1" payload-expression="args[0] + 'thing2'"/>
  <int:method name="send2" payload-expression="@someBean.sum(args[0])"/>
  <int:method name="send3" payload-expression="method"/>
  <int:method name="send4">
    <int:header name="thing1" expression="args[2].toUpperCase()"/>
  </int:method>
</int:gateway>

@MessagingGateway 註解

從 4.0 版開始,閘道服務介面可以使用 @MessagingGateway 註解標記,而不需要定義 <gateway /> xml 元素進行組態。以下成對範例比較了設定相同閘道的兩種方法

<int:gateway id="myGateway" service-interface="org.something.TestGateway"
      default-request-channel="inputC">
  <int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
  <int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
  <int:method name="echoUpperCase" request-channel="inputB">
    <int:header name="thing1" value="thing2"/>
  </int:method>
  <int:method name="echoViaDefault"/>
</int:gateway>
@MessagingGateway(name = "myGateway", defaultRequestChannel = "inputC",
		  defaultHeaders = @GatewayHeader(name = "calledMethod",
		                           expression="#gatewayMethod.name"))
public interface TestGateway {

   @Gateway(requestChannel = "inputA", replyTimeout = 2, requestTimeout = 200)
   String echo(String payload);

   @Gateway(requestChannel = "inputB", headers = @GatewayHeader(name = "thing1", value="thing2"))
   String echoUpperCase(String payload);

   String echoViaDefault(String payload);

}
與 XML 版本類似,當 Spring Integration 在元件掃描期間探索到這些註解時,它會使用其訊息傳遞基礎結構建立 proxy 實作。若要執行此掃描並在應用程式內容中註冊 BeanDefinition,請將 @IntegrationComponentScan 註解新增至 @Configuration 類別。標準 @ComponentScan 基礎結構不處理介面。因此,我們引入了自訂 @IntegrationComponentScan 邏輯,以在介面上尋找 @MessagingGateway 註解,並為其註冊 GatewayProxyFactoryBean 實例。另請參閱註解支援

除了 @MessagingGateway 註解之外,您可以使用 @Profile 註解標記服務介面,以避免在設定檔未處於作用中狀態時建立 bean。

從 6.0 版開始,具有 @MessagingGateway 的介面也可以使用 @Primary 註解標記,以進行各自的組態邏輯,就像任何 Spring @Component 定義一樣。

從 6.0 版開始,@MessagingGateway 介面可以在標準 Spring @Import 組態中使用。這可以用作 @IntegrationComponentScan 或手動 AnnotationGatewayProxyFactoryBean bean 定義的替代方案。

@MessagingGateway6.0 版開始使用 @MessageEndpoint 進行 Meta-annotated,且 name() 屬性基本上是 @Compnent.value() 的別名。這樣一來,閘道代理的 bean 名稱產生策略會與已掃描和匯入元件的標準 Spring 註解組態重新對齊。預設 AnnotationBeanNameGenerator 可以透過 AnnotationConfigUtils.CONFIGURATION_BEAN_NAME_GENERATOR 全域覆寫,或作為 @IntegrationComponentScan.nameGenerator() 屬性覆寫。

如果您沒有 XML 組態,則至少在一個 @Configuration 類別上需要 @EnableIntegration 註解。如需更多資訊,請參閱組態和 @EnableIntegration

調用無引數方法

當在沒有任何引數的閘道介面上調用方法時,預設行為是從 PollableChannel 接收 Message

但是,有時您可能想要觸發無引數方法,以便您可以與不需要使用者提供參數的其他下游元件互動,例如觸發無引數 SQL 調用或預存程序。

若要達成傳送和接收語意,您必須提供 payload。若要產生 payload,介面上的方法參數不是必要的。您可以使用 @Payload 註解或 XML 中 method 元素上的 payload-expression 屬性。以下列表包含 payload 可能的一些範例

  • 常值字串

  • #gatewayMethod.name

  • new java.util.Date()

  • @someBean.someMethod() 的傳回值

以下範例顯示如何使用 @Payload 註解

public interface Cafe {

    @Payload("new java.util.Date()")
    List<Order> retrieveOpenOrders();

}

您也可以使用 @Gateway 註解。

public interface Cafe {

    @Gateway(payloadExpression = "new java.util.Date()")
    List<Order> retrieveOpenOrders();

}
如果這兩個註解都存在(且提供了 payloadExpression),則 @Gateway 優先。

如果方法沒有引數且沒有傳回值,但確實包含 payload 運算式,則會將其視為僅傳送作業。

調用 default 方法

閘道代理的介面也可能具有 default 方法,且從 5.3 版開始,框架會將 DefaultMethodInvokingMethodInterceptor 注入到代理中,以使用 java.lang.invoke.MethodHandle 方法而不是代理來調用 default 方法。來自 JDK 的介面 (例如 java.util.function.Function) 仍然可以用於閘道代理,但由於針對 JDK 類別的 MethodHandles.Lookup 實例化的內部 Java 安全性原因,無法調用其 default 方法。這些方法也可以使用方法上的明確 @Gateway 註解,或 @MessagingGateway 註解或 <gateway> XML 元件上的 proxyDefaultMethods 進行代理(遺失其實作邏輯,同時還原先前的閘道代理行為)。

錯誤處理

閘道調用可能會導致錯誤。依預設,下游發生的任何錯誤都會在閘道的方法調用時「按原樣」重新擲回。例如,考量以下簡單流程

gateway -> service-activator

如果服務啟動器調用的服務擲回 MyException(例如),框架會將其包裝在 MessagingException 中,並將傳遞至服務啟動器的訊息附加在 failedMessage 屬性中。因此,框架執行的任何記錄都有完整的失敗內容。依預設,當閘道攔截到例外狀況時,MyException 會解除包裝並擲回給呼叫者。您可以在閘道方法宣告上設定 throws 子句,以符合原因鏈中的特定例外狀況類型。例如,如果您想要使用下游錯誤原因的所有訊息傳遞資訊來攔截整個 MessagingException,您應該具有類似於以下內容的閘道方法

public interface MyGateway {

    void performProcess() throws MessagingException;

}

由於我們鼓勵 POJO 程式設計,因此您可能不希望將呼叫者暴露於訊息傳遞基礎結構。

如果您的閘道方法沒有 throws 子句,則閘道會走訪原因樹狀結構,尋找不是 MessagingExceptionRuntimeException。如果找不到,框架會擲回 MessagingException。如果先前討論中的 MyException 具有 SomeOtherException 的原因,且您的方法 throws SomeOtherException,則閘道會進一步解除包裝並將其擲回給呼叫者。

當宣告閘道時沒有 service-interface,則會使用內部框架介面 RequestReplyExchanger

考量以下範例

public interface RequestReplyExchanger {

	Message<?> exchange(Message<?> request) throws MessagingException;

}

在 5.0 版本之前,此 exchange 方法沒有 throws 子句,因此例外會被解包裝 (unwrapped)。如果您使用此介面並希望恢復先前的解包裝行為,請使用自訂的 service-interface,或者自行存取 MessagingExceptioncause

然而,您可能想要記錄錯誤而不是傳播它,或者您可能想要將例外視為有效的回覆(透過將其映射到符合呼叫者理解的某些「錯誤訊息」契約的訊息)。為了實現這一點,閘道器透過支援 error-channel 屬性,為專用於錯誤的訊息通道提供支援。在以下範例中,「transformer」從 Exception 建立回覆 Message

<int:gateway id="sampleGateway"
    default-request-channel="gatewayChannel"
    service-interface="foo.bar.SimpleGateway"
    error-channel="exceptionTransformationChannel"/>

<int:transformer input-channel="exceptionTransformationChannel"
        ref="exceptionTransformer" method="createErrorResponse"/>

exceptionTransformer 可以是一個簡單的 POJO,它知道如何建立預期的錯誤回應物件。這會成為傳回給呼叫者的酬載 (payload)。如有必要,您可以在這樣的「錯誤流程」中執行更多複雜的操作。它可能涉及路由器(包括 Spring Integration 的 ErrorMessageExceptionTypeRouter)、篩選器等等。然而,大多數時候,一個簡單的 'transformer' 應該就足夠了。

或者,您可能只想記錄例外(或非同步地將其發送到某處)。如果您提供單向流程,則不會將任何內容傳回給呼叫者。如果您想完全抑制例外,您可以提供對全域 nullChannel 的參考(本質上是一種 /dev/null 方法)。最後,如上所述,如果沒有定義 error-channel,則例外會像往常一樣傳播。

當您使用 @MessagingGateway 註解時(請參閱 @MessagingGateway` Annotation),您可以使用 `errorChannel` 屬性。

從 5.0 版本開始,當您使用具有 void 回傳類型(單向流程)的閘道器方法時,error-channel 參考(如果提供)會填充到每個發送訊息的標準 errorChannel 標頭中。此功能允許下游非同步流程,基於標準 ExecutorChannel 配置(或 QueueChannel),覆寫預設的全域 errorChannel 例外發送行為。先前,您必須使用 @GatewayHeader 註解或 <header> 元素手動指定 errorChannel 標頭。對於具有非同步流程的 void 方法,error-channel 屬性會被忽略。相反地,錯誤訊息會被發送到預設的 errorChannel

透過簡單的 POJI 閘道器公開訊息系統提供了好處,但是「隱藏」底層訊息系統的真實性確實需要付出代價,因此有些事情您應該考慮。我們希望我們的 Java 方法盡可能快速地返回,而不會無限期地掛起,同時呼叫者正在等待它返回(無論是 void、回傳值還是拋出的 Exception)。當常規方法用作訊息系統前面的代理時,我們必須考慮底層訊息的潛在非同步性質。這表示閘道器啟動的訊息可能有可能被篩選器丟棄,並且永遠不會到達負責產生回覆的組件。某些服務啟動器方法可能會導致例外,因此不提供回覆(因為我們不產生 null 訊息)。換句話說,多種情況可能導致回覆訊息永遠不會到來。這在訊息系統中是很自然的。但是,請考慮對閘道器方法的影響。閘道器方法的輸入參數被併入訊息並向下游發送。回覆訊息將被轉換為閘道器方法的回傳值。因此,您可能想要確保對於每個閘道器呼叫,始終存在回覆訊息。否則,如果 reply-timeout 設定為負值,您的閘道器方法可能會永遠不會返回並無限期地掛起。處理這種情況的一種方法是使用非同步閘道器(在本節稍後說明)。另一種處理方法是依賴預設的 reply-timeout 作為 30 秒。這樣,閘道器掛起的時間不會超過 reply-timeout 指定的時間,如果該逾時時間已過,則會返回 'null'。最後,您可能想要考慮設定下游標誌,例如服務啟動器上的 'requires-reply' 或篩選器上的 'throw-exceptions-on-rejection'。本章的最後一節會更詳細地討論這些選項。
如果下游流程返回 ErrorMessage,則其 payload (Throwable) 會被視為常規的下游錯誤。如果配置了 error-channel,它會被發送到錯誤流程。否則,payload 會被拋給閘道器的呼叫者。同樣地,如果 error-channel 上的錯誤流程返回 ErrorMessage,則其 payload 會被拋給呼叫者。這同樣適用於任何具有 Throwable payload 的訊息。當您需要將 Exception 直接傳播給呼叫者時,這在非同步情況下可能很有用。為此,您可以返回 Exception(作為來自某些服務的 reply)或拋出它。通常,即使使用非同步流程,框架也會負責將下游流程拋出的例外傳播回閘道器。TCP Client-Server Multiplex 範例示範了將例外返回給呼叫者的兩種技術。它透過使用具有 group-timeoutaggregator(請參閱 Aggregator and Group Timeout)和丟棄流程上的 MessagingTimeoutException 回覆,模擬 socket IO 錯誤以等待執行緒。

閘道器逾時

閘道器具有兩個逾時屬性:requestTimeoutreplyTimeout。請求逾時僅在通道可以阻塞時適用(例如,已滿的有界 QueueChannel)。replyTimeout 值是閘道器等待回覆或返回 null 的時間長度。它預設為無限大。

逾時可以設定為閘道器上所有方法的預設值(defaultRequestTimeoutdefaultReplyTimeout),或在 MessagingGateway 介面註解上設定。個別方法可以覆寫這些預設值(在 <method/> 子元素中)或在 @Gateway 註解上覆寫。

從 5.0 版本開始,逾時可以定義為表達式,如下列範例所示

@Gateway(payloadExpression = "args[0]", requestChannel = "someChannel",
        requestTimeoutExpression = "args[1]", replyTimeoutExpression = "args[2]")
String lateReply(String payload, long requestTimeout, long replyTimeout);

評估上下文具有 BeanResolver(使用 @someBean 來參考其他 bean),並且可以使用來自 #root 物件的 args 陣列屬性。有關此根物件的更多資訊,請參閱 表達式和「全域」標頭。使用 XML 配置時,逾時屬性可以是 long 值或 SpEL 表達式,如下列範例所示

<method name="someMethod" request-channel="someRequestChannel"
                      payload-expression="args[0]"
                      request-timeout="1000"
                      reply-timeout="args[1]">
</method>

非同步閘道器

作為一種模式,訊息閘道器提供了一種很好的方式來隱藏特定於訊息傳遞的代码,同時仍然公開訊息系統的全部功能。如 先前所述GatewayProxyFactoryBean 提供了一種方便的方式,透過服務介面公開代理,讓您可以基於 POJO 的方式存取訊息系統(基於您自己的領域中的物件、原始類型/字串或其他物件)。然而,當閘道器透過返回值的簡單 POJO 方法公開時,這意味著對於每個請求訊息(在調用方法時產生),都必須有一個回覆訊息(在方法返回時產生)。由於訊息系統本質上是非同步的,因此您可能無法始終保證「對於每個請求,始終會有回覆」的契約。Spring Integration 2.0 引入了對非同步閘道器的支援,當您可能不知道是否需要回覆或回覆需要多長時間才能到達時,它提供了一種方便的方式來啟動流程。

為了處理這些類型的場景,Spring Integration 使用 java.util.concurrent.Future 實例來支援非同步閘道器。

從 XML 配置來看,沒有任何變化,您仍然以與定義常規閘道器相同的方式定義非同步閘道器,如下列範例所示

<int:gateway id="mathService"
     service-interface="org.springframework.integration.sample.gateway.futures.MathServiceGateway"
     default-request-channel="requestChannel"/>

然而,閘道器介面(服務介面)略有不同,如下所示

public interface MathServiceGateway {

  Future<Integer> multiplyByTwo(int i);

}

如前面的範例所示,閘道器方法的回傳類型是 Future。當 GatewayProxyFactoryBean 看到閘道器方法的回傳類型是 Future 時,它會立即切換到非同步模式,方法是使用 AsyncTaskExecutor。這就是差異的程度。對這種方法的呼叫始終會立即返回一個 Future 實例。然後,您可以按照自己的節奏與 Future 互動以取得結果、取消等等。此外,與任何其他 Future 實例的使用一樣,呼叫 get() 可能會顯示逾時、執行例外等等。以下範例示範如何使用從非同步閘道器返回的 Future

MathServiceGateway mathService = ac.getBean("mathService", MathServiceGateway.class);
Future<Integer> result = mathService.multiplyByTwo(number);
// do something else here since the reply might take a moment
int finalResult =  result.get(1000, TimeUnit.SECONDS);

有關更詳細的範例,請參閱 Spring Integration 範例中的 async-gateway 範例。

AsyncTaskExecutor

預設情況下,當提交內部 AsyncInvocationTask 實例給任何回傳類型為 Future 的閘道器方法時,GatewayProxyFactoryBean 使用 org.springframework.core.task.SimpleAsyncTaskExecutor。然而,<gateway/> 元素的配置中的 async-executor 屬性允許您提供對 Spring 應用程式上下文中可用的任何 java.util.concurrent.Executor 實作的參考。

(預設)SimpleAsyncTaskExecutor 同時支援 FutureCompletableFuture 回傳類型。請參閱 CompletableFuture。即使存在預設的 executor,提供外部的 executor 通常也很有用,這樣您就可以在日誌中識別其執行緒(使用 XML 時,執行緒名稱基於 executor 的 bean 名稱),如下列範例所示

@Bean
public AsyncTaskExecutor exec() {
    SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
    simpleAsyncTaskExecutor.setThreadNamePrefix("exec-");
    return simpleAsyncTaskExecutor;
}

@MessagingGateway(asyncExecutor = "exec")
public interface ExecGateway {

    @Gateway(requestChannel = "gatewayChannel")
    Future<?> doAsync(String foo);

}

如果您希望返回不同的 Future 實作,您可以提供自訂的 executor 或完全停用 executor,並從下游流程的回覆訊息酬載中返回 Future。若要停用 executor,請在 GatewayProxyFactoryBean 中將其設定為 null(透過使用 setAsyncTaskExecutor(null))。使用 XML 配置閘道器時,請使用 async-executor=""。當使用 @MessagingGateway 註解進行配置時,請使用類似於以下的程式碼

@MessagingGateway(asyncExecutor = AnnotationConstants.NULL)
public interface NoExecGateway {

    @Gateway(requestChannel = "gatewayChannel")
    Future<?> doAsync(String foo);

}
如果回傳類型是特定的具體 Future 實作或某些其他不被配置的 executor 支援的子介面,則流程會在呼叫者的執行緒上執行,並且流程必須在回覆訊息酬載中返回所需的類型。

CompletableFuture

從 4.2 版本開始,閘道器方法現在可以返回 CompletableFuture<?>。返回此類型時有兩種操作模式

  • 當提供非同步 executor 且回傳類型正好是 CompletableFuture(而不是子類別)時,框架會在 executor 上執行任務,並立即將 CompletableFuture 返回給呼叫者。CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor) 用於建立 future。

  • 當非同步 executor 明確設定為 null 且回傳類型是 CompletableFuture 或回傳類型是 CompletableFuture 的子類別時,流程會在呼叫者的執行緒上調用。在這種情況下,預期下游流程會返回適當類型的 CompletableFuture

org.springframework.util.concurrent.ListenableFuture 已從 Spring Framework 6.0 開始棄用。現在建議遷移到 CompletableFuture,它提供類似的處理功能。

使用情境

在以下情境中,呼叫者執行緒立即返回 CompletableFuture<Invoice>,當下游流程回覆閘道器(使用 Invoice 物件)時,它會完成。

CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="something.Service" default-request-channel="orders" />

在以下情境中,當下游流程將 CompletableFuture<Invoice> 作為回覆閘道器的酬載提供時,呼叫者執行緒會返回 CompletableFuture<Invoice>。當發票準備就緒時,其他一些程序必須完成 future。

CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders"
    async-executor="" />

在以下情境中,當下游流程將 CompletableFuture<Invoice> 作為回覆閘道器的酬載提供時,呼叫者執行緒會返回 CompletableFuture<Invoice>。當發票準備就緒時,其他一些程序必須完成 future。如果啟用 DEBUG 日誌記錄,則會發出日誌條目,指示非同步 executor 無法用於此情境。

MyCompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders" />

CompletableFuture 實例可用於對回覆執行額外的操作,如下列範例所示

CompletableFuture<String> process(String data);

...

CompletableFuture result = process("foo")
    .thenApply(t -> t.toUpperCase());

...

String out = result.get(10, TimeUnit.SECONDS);

Reactor Mono

從 5.0 版本開始,GatewayProxyFactoryBean 允許將 Project Reactor 與閘道器介面方法一起使用,方法是使用 Mono<T> 回傳類型。內部的 AsyncInvocationTask 包裝在 Mono.fromCallable() 中。

Mono 可用於稍後檢索結果(類似於 Future<?>),或者您可以在結果返回到閘道器時透過調用您的 Consumer 從 dispatcher 中使用它。

Mono 不會立即被框架刷新 (flushed)。因此,底層訊息流程不會在閘道器方法返回之前啟動(就像使用 Future<?> Executor 任務一樣)。流程在 Mono 被訂閱時啟動。或者,Mono(作為「Composable」)可能是 Reactor 串流的一部分,其中 subscribe() 與整個 Flux 相關。以下範例示範如何使用 Project Reactor 建立閘道器
@MessagingGateway
public interface TestGateway {

    @Gateway(requestChannel = "multiplyChannel")
    Mono<Integer> multiply(Integer value);

}

@ServiceActivator(inputChannel = "multiplyChannel")
public Integer multiply(Integer value) {
    return value * 2;
}

其中這樣的閘道器可以用於處理 Flux 資料的某些服務中

@Autowired
TestGateway testGateway;

public void hadnleFlux() {
    Flux.just("1", "2", "3", "4", "5")
            .map(Integer::parseInt)
            .flatMap(this.testGateway::multiply)
            .collectList()
            .subscribe(System.out::println);
}

另一個使用 Project Reactor 的範例是一個簡單的回呼情境,如下列範例所示

Mono<Invoice> mono = service.process(myOrder);

mono.subscribe(invoice -> handleInvoice(invoice));

呼叫執行緒繼續執行,當流程完成時,會調用 handleInvoice()

另請參閱 Kotlin 協程 以取得更多資訊。

下游流程返回非同步類型

如上面的 AsyncTaskExecutor 節中所述,如果您希望某些下游組件返回具有非同步酬載(FutureMono 等)的訊息,則必須明確地將非同步 executor 設定為 null(或使用 XML 配置時設定為 "")。然後,流程會在呼叫者執行緒上調用,並且稍後可以檢索結果。

非同步 void 回傳類型

訊息閘道器方法可以這樣宣告

@MessagingGateway
public interface MyGateway {

    @Gateway(requestChannel = "sendAsyncChannel")
    @Async
    void sendAsync(String payload);

}

但是下游例外不會傳播回呼叫者。為了確保下游流程調用的非同步行為以及例外傳播到呼叫者,從 6.0 版本開始,框架提供了對 Future<Void>Mono<Void> 回傳類型的支援。使用案例與先前針對純 void 回傳類型描述的發送後不管 (send-and-forget) 行為類似,但不同之處在於流程執行是非同步發生的,並且返回的 Future(或 Mono)會根據 send 操作結果以 null 或例外情況完成。

如果 Future<Void> 是精確的下游流程回覆,則閘道器的 asyncExecutor 選項必須設定為 null(對於 @MessagingGateway 配置,則為 AnnotationConstants.NULL),並且 send 部分在生產者執行緒上執行。回覆部分取決於下游流程配置。這樣,由目標應用程式正確產生 Future<Void> 回覆。Mono 使用案例已經超出框架執行緒控制範圍,因此將 asyncExecutor 設定為 null 沒有意義。作為請求-回覆閘道器操作結果的 Mono<Void> 必須配置為閘道器方法的 Mono<?> 回傳類型。

閘道器在沒有收到回覆時的行為

先前解釋的,閘道器提供了一種方便的方式,透過 POJO 方法調用與訊息系統互動。然而,典型的常規方法調用(通常預期始終返回(即使有例外)),可能不總是一對一地映射到訊息交換(例如,回覆訊息可能不會到達 — 相當於方法不返回)。

本節的其餘部分涵蓋了各種情境以及如何使閘道器行為更可預測。可以配置某些屬性以使同步閘道器行為更可預測,但其中一些屬性可能並不總是像您預期的那樣工作。其中之一是 reply-timeout(在方法層級或閘道器層級的 default-reply-timeout)。我們檢查 reply-timeout 屬性,以了解它如何在各種情境中影響和不能影響同步閘道器的行為。我們檢查單執行緒情境(所有下游組件都透過直接通道連接)和多執行緒情境(例如,在下游的某個地方,您可能有一個可輪詢或執行器通道,它們會打破單執行緒邊界)。

長時間運行的下游程序

同步閘道器,單執行緒

如果下游組件仍在運行(可能是因為無限迴圈或慢速服務),則設定 reply-timeout 無效,並且閘道器方法調用不會返回,直到下游服務退出(透過返回或拋出例外)。

同步閘道器,多執行緒

如果下游組件在多執行緒訊息流程中仍在運行(可能是因為無限迴圈或慢速服務),則設定 reply-timeout 有效,它允許閘道器方法調用在達到逾時時間後返回,因為 GatewayProxyFactoryBean 會輪詢回覆通道,等待訊息直到逾時時間到期。但是,如果在產生實際回覆之前已達到逾時時間,則可能會導致從閘道器方法返回 'null'。您應該理解,回覆訊息(如果產生)是在閘道器方法調用可能已返回之後發送到回覆通道的,因此您必須意識到這一點並在設計流程時考慮到這一點。

另請參閱 errorOnTimeout 屬性,以便在發生逾時時拋出 MessageTimeoutException 而不是返回 null

下游組件返回 'null'

同步閘道器 — 單執行緒

如果下游組件返回 'null' 並且 reply-timeout 已配置為負值,則閘道器方法調用會無限期地掛起,除非在可能返回 'null' 的下游組件(例如,服務啟動器)上設定了 requires-reply 屬性。在這種情況下,將拋出例外並傳播到閘道器。

同步閘道器 — 多執行緒

行為與前一種情況相同。

下游組件回傳簽名為 'void',而閘道器方法簽名為非 void

同步閘道器 — 單執行緒

如果下游組件返回 'void' 並且 reply-timeout 已配置為負值,則閘道器方法調用會無限期地掛起。

同步閘道器 — 多執行緒

行為與前一種情況相同。

下游組件導致執行階段例外

同步閘道器 — 單執行緒

如果下游組件拋出執行階段例外,則例外會透過錯誤訊息傳播回閘道器並重新拋出。

同步閘道器 — 多執行緒

行為與前一種情況相同。

您應該理解,預設情況下,reply-timeout 是無界的。因此,如果您將 reply-timeout 設定為負值,則您的閘道器方法調用可能會無限期地掛起。因此,為了確保您分析您的流程,並且即使只有很小的可能性發生這些情境之一,您也應該將 reply-timeout 屬性設定為「安全」值。預設值為 30 秒。更好的是,您可以將下游組件的 requires-reply 屬性設定為 'true',以確保及時的回應,就像在該下游組件在內部返回 null 時立即拋出例外所產生的一樣。但是,您也應該意識到,在某些情境下(請參閱 第一個情境),reply-timeout 沒有幫助。這表示分析您的訊息流程並決定何時使用同步閘道器而不是非同步閘道器也很重要。如 先前所述,後一種情況是定義返回 Future 實例的閘道器方法的問題。然後,您可以保證收到該回傳值,並且您可以更精細地控制調用的結果。此外,當處理路由器時,您應該記住,如果路由器無法解析特定通道,則將 resolution-required 屬性設定為 'true' 會導致路由器拋出例外。同樣地,當處理篩選器時,您可以設定 throw-exception-on-rejection 屬性。在這兩種情況下,產生的流程行為都像是它包含具有 'requires-reply' 屬性的服務啟動器。換句話說,它有助於確保閘道器方法調用的及時回應。
您應該理解,計時器在執行緒返回到閘道器時開始 — 也就是說,當流程完成或訊息被移交給另一個執行緒時。屆時,呼叫執行緒開始等待回覆。如果流程完全是同步的,則回覆會立即可用。對於非同步流程,執行緒最多等待此時間。

從 6.2 版本開始,內部 MethodInvocationGateway (MessagingGatewaySupport 的擴展) 的 errorOnTimeout 屬性在 @MessagingGatewayGatewayEndpointSpec 上公開。此選項的含義與 Endpoint Summary 章節末尾解釋的任何入站閘道器完全相同。換句話說,將此選項設定為 true 將導致從發送和接收閘道器操作中拋出 MessageTimeoutException,而不是在接收逾時時間耗盡時返回 null

請參閱 Java DSL 章節中的 IntegrationFlow 作為閘道器,以了解透過 IntegrationFlow 定義閘道器的選項。