訊息發布
(面向切面程式設計)AOP 訊息發布功能可讓您建構並傳送訊息,作為方法調用的副產品。例如,假設您有一個元件,並且每次此元件的狀態變更時,您都希望收到訊息通知。傳送此類通知的最簡單方法是將訊息傳送到專用通道,但是您將如何將變更物件狀態的方法調用連接到訊息傳送過程,以及通知訊息應如何結構化?AOP 訊息發布功能透過組態驅動方法處理這些職責。
訊息發布組態
Spring Integration 提供兩種方法:XML 組態和註解驅動 (Java) 組態。
使用 @Publisher
註解的註解驅動組態
註解驅動方法可讓您使用 @Publisher
註解註解任何方法,以指定 'channel' 屬性。從 5.1 版開始,若要開啟此功能,您必須在某些 @Configuration
類別上使用 @EnablePublisher
註解。請參閱組態和 @EnableIntegration
以取得更多資訊。訊息是從方法調用的傳回值建構,並傳送到 'channel' 屬性指定的通道。為了進一步管理訊息結構,您還可以組合使用 @Payload
和 @Header
註解。
在內部,Spring Integration 的此訊息發布功能同時使用 Spring AOP(透過定義 PublisherAnnotationAdvisor
)和 Spring 運算式語言 (SpEL),讓您對其發布的 Message
的結構具有相當大的彈性和控制權。
PublisherAnnotationAdvisor
定義並繫結以下變數
-
#return
:繫結至傳回值,讓您參考它或其屬性(例如,#return.something
,其中 'something' 是繫結至#return
的物件的屬性) -
#exception
:如果方法調用擲回例外狀況,則繫結至例外狀況 -
#args
:繫結至方法引數,以便您可以依名稱擷取個別引數(例如,#args.fname
)
考慮以下範例
@Publisher
public String defaultPayload(String fname, String lname) {
return fname + " " + lname;
}
在上述範例中,訊息是使用以下結構建構的
-
訊息 Payload 是方法的傳回類型和值。這是預設值。
-
新建構的訊息會傳送到使用註解後處理器設定的預設發布者通道(稍後在本節中介紹)。
以下範例與上述範例相同,但它不使用預設發布通道
@Publisher(channel="testChannel")
public String defaultPayload(String fname, @Header("last") String lname) {
return fname + " " + lname;
}
我們不是使用預設發布通道,而是透過設定 @Publisher
註解的 'channel' 屬性來指定發布通道。我們還新增了 @Header
註解,這會導致名為 'last' 的訊息標頭具有與 'lname' 方法參數相同的值。該標頭會新增至新建構的訊息。
以下範例幾乎與上述範例相同
@Publisher(channel="testChannel")
@Payload
public String defaultPayloadButExplicitAnnotation(String fname, @Header String lname) {
return fname + " " + lname;
}
唯一的區別是我們在方法上使用 @Payload
註解來明確指定應將方法的傳回值用作訊息的 Payload。
以下範例透過在 @Payload
註解中使用 Spring 運算式語言來擴充先前的組態,以進一步指示框架如何建構訊息
@Publisher(channel="testChannel")
@Payload("#return + #args.lname")
public String setName(String fname, String lname, @Header("x") int num) {
return fname + " " + lname;
}
在上述範例中,訊息是方法調用的傳回值和 'lname' 輸入引數的串連。名為 'x' 的訊息標頭的值由 'num' 輸入引數決定。該標頭會新增至新建構的訊息。
@Publisher(channel="testChannel")
public String argumentAsPayload(@Payload String fname, @Header String lname) {
return fname + " " + lname;
}
在上述範例中,您會看到 @Payload
註解的另一個用法。在這裡,我們註解了一個方法引數,該引數會變成新建構訊息的 Payload。
與 Spring 中的大多數其他註解驅動功能一樣,您需要註冊後處理器 (PublisherAnnotationBeanPostProcessor
)。以下範例示範如何執行此操作
<bean class="org.springframework.integration.aop.PublisherAnnotationBeanPostProcessor"/>
為了更簡潔的組態,您可以改為使用命名空間支援,如下列範例所示
<int:annotation-config>
<int:enable-publisher default-publisher-channel="defaultChannel"/>
</int:annotation-config>
對於 Java 組態,您必須使用 @EnablePublisher
註解,如下列範例所示
@Configuration
@EnableIntegration
@EnablePublisher("defaultChannel")
public class IntegrationConfiguration {
...
}
從 5.1.3 版開始,<int:enable-publisher>
元件以及 @EnablePublisher
註解都具有 proxy-target-class
和 order
屬性,用於調整 ProxyFactory
組態。
與其他 Spring 註解 (@Component
、@Scheduled
等) 類似,您也可以將 @Publisher
用作 Meta-Annotation。這表示您可以定義自己的註解,這些註解的處理方式與 @Publisher
本身相同。以下範例示範如何執行此操作
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Publisher(channel="auditChannel")
public @interface Audit {
...
}
在上述範例中,我們定義了 @Audit
註解,該註解本身使用 @Publisher
進行註解。另請注意,您可以在 Meta-Annotation 上定義 channel
屬性,以封裝訊息在此註解內傳送的位置。現在您可以使用 @Audit
註解註解任何方法,如下列範例所示
@Audit
public String test() {
return "Hello";
}
在上述範例中,每次調用 test()
方法都會產生一個訊息,其 Payload 是從其傳回值建立的。每個訊息都會傳送到名為 auditChannel
的通道。此技術的優點之一是,您可以避免在多個註解中重複相同的通道名稱。您還可以在您自己的(可能特定於網域的)註解和框架提供的註解之間提供一層間接性。
您也可以註解類別,這可讓您將此註解的屬性套用至該類別的每個公用方法,如下列範例所示
@Audit
static class BankingOperationsImpl implements BankingOperations {
public String debit(String amount) {
. . .
}
public String credit(String amount) {
. . .
}
}
使用 <publishing-interceptor>
元素的基於 XML 的方法
基於 XML 的方法可讓您將相同的基於 AOP 的訊息發布功能組態為 MessagePublishingInterceptor
的基於命名空間的組態。相較於註解驅動方法,它當然有一些優點,因為它可讓您使用 AOP 切入點運算式,因此可能會一次攔截多個方法,或攔截和發布您沒有原始碼的方法。
若要使用 XML 組態訊息發布,您只需要執行以下兩件事
-
透過使用
<publishing-interceptor>
XML 元素,為MessagePublishingInterceptor
提供組態。 -
提供 AOP 組態,以將
MessagePublishingInterceptor
套用至受管理物件。
以下範例示範如何組態 publishing-interceptor
元素
<aop:config>
<aop:advisor advice-ref="interceptor" pointcut="bean(testBean)" />
</aop:config>
<publishing-interceptor id="interceptor" default-channel="defaultChannel">
<method pattern="echo" payload="'Echoing: ' + #return" channel="echoChannel">
<header name="things" value="something"/>
</method>
<method pattern="repl*" payload="'Echoing: ' + #return" channel="echoChannel">
<header name="things" expression="'something'.toUpperCase()"/>
</method>
<method pattern="echoDef*" payload="#return"/>
</publishing-interceptor>
<publishing-interceptor>
組態看起來與基於註解的方法非常相似,並且它也使用了 Spring 運算式語言的強大功能。
在上述範例中,執行 testBean
的 echo
方法會呈現具有以下結構的 Message
-
Message
Payload 的類型為String
,內容如下:Echoing: [value]
,其中value
是執行方法傳回的值。 -
Message
具有一個名為things
的標頭和一個值something
。 -
Message
會傳送到echoChannel
。
第二種方法與第一種方法非常相似。在這裡,每個以 'repl' 開頭的方法都會呈現具有以下結構的 Message
-
Message
Payload 與前一個範例中的相同。 -
Message
具有一個名為things
的標頭,其值是 SpEL 運算式'something'.toUpperCase()
的結果。 -
Message
會傳送到echoChannel
。
第二種方法,將任何以 echoDef
開頭的方法的執行映射,產生具有以下結構的 Message
-
Message
Payload 是執行方法傳回的值。 -
由於未提供
channel
屬性,因此Message
會傳送到publisher
定義的defaultChannel
。
對於簡單的映射規則,您可以依賴 publisher
預設值,如下列範例所示
<publishing-interceptor id="anotherInterceptor"/>
上述範例將符合切入點運算式的每個方法的傳回值映射到 Payload,並傳送到 default-channel
。如果您未指定 defaultChannel
(如上述範例未執行),則訊息會傳送到全域 nullChannel
(相當於 /dev/null
)。
非同步發布
發布發生在與元件執行相同的執行緒中。因此,預設情況下,它是同步的。這表示整個訊息流程必須等到發布者的流程完成。但是,開發人員通常希望完全相反:使用此訊息發布功能來啟動非同步流程。例如,您可能會託管接收遠端請求的服務(HTTP、WS 等)。您可能想要在內部將此請求傳送到可能需要一段時間的流程中。但是,您可能也想要立即回覆使用者。因此,您可以使用 'output-channel' 或 'replyChannel' 標頭將簡單的確認式回覆傳送回呼叫者,同時使用訊息發布者功能來啟動複雜的流程,而不是傳送輸入請求以供輸出通道處理(傳統方式)。
以下範例中的服務接收複雜的 Payload(需要進一步傳送以進行處理),但它也需要以簡單的確認回覆呼叫者
public String echo(Object complexPayload) {
return "ACK";
}
因此,我們不是將複雜的流程連接到輸出通道,而是改為使用訊息發布功能。我們將其組態為透過使用服務方法的輸入引數(如前一個範例所示)來建立新訊息,並將其傳送到 'localProcessChannel'。為了確保此流程是非同步的,我們需要做的就是將其傳送到任何類型的非同步通道(下一個範例中的 ExecutorChannel
)。以下範例示範如何進行非同步 publishing-interceptor
<int:service-activator input-channel="inputChannel" output-channel="outputChannel" ref="sampleservice"/>
<bean id="sampleService" class="test.SampleService"/>
<aop:config>
<aop:advisor advice-ref="interceptor" pointcut="bean(sampleService)" />
</aop:config>
<int:publishing-interceptor id="interceptor" >
<int:method pattern="echo" payload="#args[0]" channel="localProcessChannel">
<int:header name="sample_header" expression="'some sample value'"/>
</int:method>
</int:publishing-interceptor>
<int:channel id="localProcessChannel">
<int:dispatcher task-executor="executor"/>
</int:channel>
<task:executor id="executor" pool-size="5"/>
處理此類型案例的另一種方法是使用 Wire Tap。請參閱Wire Tap。
根據排程觸發器產生和發布訊息
在先前的章節中,我們探討了訊息發布功能,該功能建構和發布訊息作為方法調用的副產品。但是,在這些情況下,您仍然負責調用方法。Spring Integration 2.0 在 'inbound-channel-adapter' 元素上新增了對排程訊息產生者和發布者的支援,並新增了 'expression' 屬性。您可以根據多個觸發器進行排程,其中任何一個觸發器都可以在 'poller' 元素上組態。目前,我們支援 cron
、fixed-rate
、fixed-delay
以及您實作並由 'trigger' 屬性值參考的任何自訂觸發器。
如先前所述,排程產生者和發布者的支援是透過 <inbound-channel-adapter>
XML 元素提供的。考慮以下範例
<int:inbound-channel-adapter id="fixedDelayProducer"
expression="'fixedDelayTest'"
channel="fixedDelayChannel">
<int:poller fixed-delay="1000"/>
</int:inbound-channel-adapter>
上述範例建立了一個輸入通道配接器,該配接器建構一個 Message
,其 Payload 是 expression
屬性中定義的運算式的結果。每次發生 fixed-delay
屬性指定的延遲時,都會建立並傳送此類訊息。
以下範例與上述範例類似,不同之處在於它使用了 fixed-rate
屬性
<int:inbound-channel-adapter id="fixedRateProducer"
expression="'fixedRateTest'"
channel="fixedRateChannel">
<int:poller fixed-rate="1000"/>
</int:inbound-channel-adapter>
fixed-rate
屬性可讓您以固定速率傳送訊息(從每個任務的開始時間開始測量)。
以下範例示範如何套用 Cron 觸發器,其值在 cron
屬性中指定
<int:inbound-channel-adapter id="cronProducer"
expression="'cronTest'"
channel="cronChannel">
<int:poller cron="7 6 5 4 3 ?"/>
</int:inbound-channel-adapter>
以下範例示範如何在訊息中插入其他標頭
<int:inbound-channel-adapter id="headerExpressionsProducer"
expression="'headerExpressionsTest'"
channel="headerExpressionsChannel"
auto-startup="false">
<int:poller fixed-delay="5000"/>
<int:header name="foo" expression="6 * 7"/>
<int:header name="bar" value="x"/>
</int:inbound-channel-adapter>
其他訊息標頭可以採用純量值或評估 Spring 運算式的結果。
如果您需要實作自己的自訂觸發器,您可以使用 trigger
屬性來提供對實作 org.springframework.scheduling.Trigger
介面的任何 Spring 組態 Bean 的參考。以下範例示範如何執行此操作
<int:inbound-channel-adapter id="triggerRefProducer"
expression="'triggerRefTest'" channel="triggerRefChannel">
<int:poller trigger="customTrigger"/>
</int:inbound-channel-adapter>
<beans:bean id="customTrigger" class="o.s.scheduling.support.PeriodicTrigger">
<beans:constructor-arg value="9999"/>
</beans:bean>