訊息發布

(面向切面程式設計)AOP 訊息發布功能可讓您建構並傳送訊息,作為方法調用的副產品。例如,假設您有一個元件,並且每次此元件的狀態變更時,您都希望收到訊息通知。傳送此類通知的最簡單方法是將訊息傳送到專用通道,但是您將如何將變更物件狀態的方法調用連接到訊息傳送過程,以及通知訊息應如何結構化?AOP 訊息發布功能透過組態驅動方法處理這些職責。

訊息發布組態

Spring Integration 提供兩種方法:XML 組態和註解驅動 (Java) 組態。

使用 @Publisher 註解的註解驅動組態

註解驅動方法可讓您使用 @Publisher 註解註解任何方法,以指定 'channel' 屬性。從 5.1 版開始,若要開啟此功能,您必須在某些 @Configuration 類別上使用 @EnablePublisher 註解。請參閱組態和 @EnableIntegration以取得更多資訊。訊息是從方法調用的傳回值建構,並傳送到 'channel' 屬性指定的通道。為了進一步管理訊息結構,您還可以組合使用 @Payload@Header 註解。

在內部,Spring Integration 的此訊息發布功能同時使用 Spring AOP(透過定義 PublisherAnnotationAdvisor)和 Spring 運算式語言 (SpEL),讓您對其發布的 Message 的結構具有相當大的彈性和控制權。

PublisherAnnotationAdvisor 定義並繫結以下變數

  • #return:繫結至傳回值,讓您參考它或其屬性(例如,#return.something,其中 'something' 是繫結至 #return 的物件的屬性)

  • #exception:如果方法調用擲回例外狀況,則繫結至例外狀況

  • #args:繫結至方法引數,以便您可以依名稱擷取個別引數(例如,#args.fname

考慮以下範例

@Publisher
public String defaultPayload(String fname, String lname) {
  return fname + " " + lname;
}

在上述範例中,訊息是使用以下結構建構的

  • 訊息 Payload 是方法的傳回類型和值。這是預設值。

  • 新建構的訊息會傳送到使用註解後處理器設定的預設發布者通道(稍後在本節中介紹)。

以下範例與上述範例相同,但它不使用預設發布通道

@Publisher(channel="testChannel")
public String defaultPayload(String fname, @Header("last") String lname) {
  return fname + " " + lname;
}

我們不是使用預設發布通道,而是透過設定 @Publisher 註解的 'channel' 屬性來指定發布通道。我們還新增了 @Header 註解,這會導致名為 'last' 的訊息標頭具有與 'lname' 方法參數相同的值。該標頭會新增至新建構的訊息。

以下範例幾乎與上述範例相同

@Publisher(channel="testChannel")
@Payload
public String defaultPayloadButExplicitAnnotation(String fname, @Header String lname) {
  return fname + " " + lname;
}

唯一的區別是我們在方法上使用 @Payload 註解來明確指定應將方法的傳回值用作訊息的 Payload。

以下範例透過在 @Payload 註解中使用 Spring 運算式語言來擴充先前的組態,以進一步指示框架如何建構訊息

@Publisher(channel="testChannel")
@Payload("#return + #args.lname")
public String setName(String fname, String lname, @Header("x") int num) {
  return fname + " " + lname;
}

在上述範例中,訊息是方法調用的傳回值和 'lname' 輸入引數的串連。名為 'x' 的訊息標頭的值由 'num' 輸入引數決定。該標頭會新增至新建構的訊息。

@Publisher(channel="testChannel")
public String argumentAsPayload(@Payload String fname, @Header String lname) {
  return fname + " " + lname;
}

在上述範例中,您會看到 @Payload 註解的另一個用法。在這裡,我們註解了一個方法引數,該引數會變成新建構訊息的 Payload。

與 Spring 中的大多數其他註解驅動功能一樣,您需要註冊後處理器 (PublisherAnnotationBeanPostProcessor)。以下範例示範如何執行此操作

<bean class="org.springframework.integration.aop.PublisherAnnotationBeanPostProcessor"/>

為了更簡潔的組態,您可以改為使用命名空間支援,如下列範例所示

<int:annotation-config>
    <int:enable-publisher default-publisher-channel="defaultChannel"/>
</int:annotation-config>

對於 Java 組態,您必須使用 @EnablePublisher 註解,如下列範例所示

@Configuration
@EnableIntegration
@EnablePublisher("defaultChannel")
public class IntegrationConfiguration {
    ...
}

從 5.1.3 版開始,<int:enable-publisher> 元件以及 @EnablePublisher 註解都具有 proxy-target-classorder 屬性,用於調整 ProxyFactory 組態。

與其他 Spring 註解 (@Component@Scheduled 等) 類似,您也可以將 @Publisher 用作 Meta-Annotation。這表示您可以定義自己的註解,這些註解的處理方式與 @Publisher 本身相同。以下範例示範如何執行此操作

@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Publisher(channel="auditChannel")
public @interface Audit {
...
}

在上述範例中,我們定義了 @Audit 註解,該註解本身使用 @Publisher 進行註解。另請注意,您可以在 Meta-Annotation 上定義 channel 屬性,以封裝訊息在此註解內傳送的位置。現在您可以使用 @Audit 註解註解任何方法,如下列範例所示

@Audit
public String test() {
    return "Hello";
}

在上述範例中,每次調用 test() 方法都會產生一個訊息,其 Payload 是從其傳回值建立的。每個訊息都會傳送到名為 auditChannel 的通道。此技術的優點之一是,您可以避免在多個註解中重複相同的通道名稱。您還可以在您自己的(可能特定於網域的)註解和框架提供的註解之間提供一層間接性。

您也可以註解類別,這可讓您將此註解的屬性套用至該類別的每個公用方法,如下列範例所示

@Audit
static class BankingOperationsImpl implements BankingOperations {

  public String debit(String amount) {
     . . .

  }

  public String credit(String amount) {
     . . .
  }

}

使用 <publishing-interceptor> 元素的基於 XML 的方法

基於 XML 的方法可讓您將相同的基於 AOP 的訊息發布功能組態為 MessagePublishingInterceptor 的基於命名空間的組態。相較於註解驅動方法,它當然有一些優點,因為它可讓您使用 AOP 切入點運算式,因此可能會一次攔截多個方法,或攔截和發布您沒有原始碼的方法。

若要使用 XML 組態訊息發布,您只需要執行以下兩件事

  • 透過使用 <publishing-interceptor> XML 元素,為 MessagePublishingInterceptor 提供組態。

  • 提供 AOP 組態,以將 MessagePublishingInterceptor 套用至受管理物件。

以下範例示範如何組態 publishing-interceptor 元素

<aop:config>
  <aop:advisor advice-ref="interceptor" pointcut="bean(testBean)" />
</aop:config>
<publishing-interceptor id="interceptor" default-channel="defaultChannel">
  <method pattern="echo" payload="'Echoing: ' + #return" channel="echoChannel">
    <header name="things" value="something"/>
  </method>
  <method pattern="repl*" payload="'Echoing: ' + #return" channel="echoChannel">
    <header name="things" expression="'something'.toUpperCase()"/>
  </method>
  <method pattern="echoDef*" payload="#return"/>
</publishing-interceptor>

<publishing-interceptor> 組態看起來與基於註解的方法非常相似,並且它也使用了 Spring 運算式語言的強大功能。

在上述範例中,執行 testBeanecho 方法會呈現具有以下結構的 Message

  • Message Payload 的類型為 String,內容如下:Echoing: [value],其中 value 是執行方法傳回的值。

  • Message 具有一個名為 things 的標頭和一個值 something

  • Message 會傳送到 echoChannel

第二種方法與第一種方法非常相似。在這裡,每個以 'repl' 開頭的方法都會呈現具有以下結構的 Message

  • Message Payload 與前一個範例中的相同。

  • Message 具有一個名為 things 的標頭,其值是 SpEL 運算式 'something'.toUpperCase() 的結果。

  • Message 會傳送到 echoChannel

第二種方法,將任何以 echoDef 開頭的方法的執行映射,產生具有以下結構的 Message

  • Message Payload 是執行方法傳回的值。

  • 由於未提供 channel 屬性,因此 Message 會傳送到 publisher 定義的 defaultChannel

對於簡單的映射規則,您可以依賴 publisher 預設值,如下列範例所示

<publishing-interceptor id="anotherInterceptor"/>

上述範例將符合切入點運算式的每個方法的傳回值映射到 Payload,並傳送到 default-channel。如果您未指定 defaultChannel(如上述範例未執行),則訊息會傳送到全域 nullChannel(相當於 /dev/null)。

非同步發布

發布發生在與元件執行相同的執行緒中。因此,預設情況下,它是同步的。這表示整個訊息流程必須等到發布者的流程完成。但是,開發人員通常希望完全相反:使用此訊息發布功能來啟動非同步流程。例如,您可能會託管接收遠端請求的服務(HTTP、WS 等)。您可能想要在內部將此請求傳送到可能需要一段時間的流程中。但是,您可能也想要立即回覆使用者。因此,您可以使用 'output-channel' 或 'replyChannel' 標頭將簡單的確認式回覆傳送回呼叫者,同時使用訊息發布者功能來啟動複雜的流程,而不是傳送輸入請求以供輸出通道處理(傳統方式)。

以下範例中的服務接收複雜的 Payload(需要進一步傳送以進行處理),但它也需要以簡單的確認回覆呼叫者

public String echo(Object complexPayload) {
     return "ACK";
}

因此,我們不是將複雜的流程連接到輸出通道,而是改為使用訊息發布功能。我們將其組態為透過使用服務方法的輸入引數(如前一個範例所示)來建立新訊息,並將其傳送到 'localProcessChannel'。為了確保此流程是非同步的,我們需要做的就是將其傳送到任何類型的非同步通道(下一個範例中的 ExecutorChannel)。以下範例示範如何進行非同步 publishing-interceptor

<int:service-activator  input-channel="inputChannel" output-channel="outputChannel" ref="sampleservice"/>

<bean id="sampleService" class="test.SampleService"/>

<aop:config>
  <aop:advisor advice-ref="interceptor" pointcut="bean(sampleService)" />
</aop:config>

<int:publishing-interceptor id="interceptor" >
  <int:method pattern="echo" payload="#args[0]" channel="localProcessChannel">
    <int:header name="sample_header" expression="'some sample value'"/>
  </int:method>
</int:publishing-interceptor>

<int:channel id="localProcessChannel">
  <int:dispatcher task-executor="executor"/>
</int:channel>

<task:executor id="executor" pool-size="5"/>

處理此類型案例的另一種方法是使用 Wire Tap。請參閱Wire Tap

根據排程觸發器產生和發布訊息

在先前的章節中,我們探討了訊息發布功能,該功能建構和發布訊息作為方法調用的副產品。但是,在這些情況下,您仍然負責調用方法。Spring Integration 2.0 在 'inbound-channel-adapter' 元素上新增了對排程訊息產生者和發布者的支援,並新增了 'expression' 屬性。您可以根據多個觸發器進行排程,其中任何一個觸發器都可以在 'poller' 元素上組態。目前,我們支援 cronfixed-ratefixed-delay 以及您實作並由 'trigger' 屬性值參考的任何自訂觸發器。

如先前所述,排程產生者和發布者的支援是透過 <inbound-channel-adapter> XML 元素提供的。考慮以下範例

<int:inbound-channel-adapter id="fixedDelayProducer"
       expression="'fixedDelayTest'"
       channel="fixedDelayChannel">
    <int:poller fixed-delay="1000"/>
</int:inbound-channel-adapter>

上述範例建立了一個輸入通道配接器,該配接器建構一個 Message,其 Payload 是 expression 屬性中定義的運算式的結果。每次發生 fixed-delay 屬性指定的延遲時,都會建立並傳送此類訊息。

以下範例與上述範例類似,不同之處在於它使用了 fixed-rate 屬性

<int:inbound-channel-adapter id="fixedRateProducer"
       expression="'fixedRateTest'"
       channel="fixedRateChannel">
    <int:poller fixed-rate="1000"/>
</int:inbound-channel-adapter>

fixed-rate 屬性可讓您以固定速率傳送訊息(從每個任務的開始時間開始測量)。

以下範例示範如何套用 Cron 觸發器,其值在 cron 屬性中指定

<int:inbound-channel-adapter id="cronProducer"
       expression="'cronTest'"
       channel="cronChannel">
    <int:poller cron="7 6 5 4 3 ?"/>
</int:inbound-channel-adapter>

以下範例示範如何在訊息中插入其他標頭

<int:inbound-channel-adapter id="headerExpressionsProducer"
       expression="'headerExpressionsTest'"
       channel="headerExpressionsChannel"
       auto-startup="false">
    <int:poller fixed-delay="5000"/>
    <int:header name="foo" expression="6 * 7"/>
    <int:header name="bar" value="x"/>
</int:inbound-channel-adapter>

其他訊息標頭可以採用純量值或評估 Spring 運算式的結果。

如果您需要實作自己的自訂觸發器,您可以使用 trigger 屬性來提供對實作 org.springframework.scheduling.Trigger 介面的任何 Spring 組態 Bean 的參考。以下範例示範如何執行此操作

<int:inbound-channel-adapter id="triggerRefProducer"
       expression="'triggerRefTest'" channel="triggerRefChannel">
    <int:poller trigger="customTrigger"/>
</int:inbound-channel-adapter>

<beans:bean id="customTrigger" class="o.s.scheduling.support.PeriodicTrigger">
    <beans:constructor-arg value="9999"/>
</beans:bean>