延遲器
延遲器是一個簡單的端點,可讓訊息流程延遲一段特定的時間間隔。當訊息被延遲時,原始發送者不會被封鎖。相反地,延遲的訊息會排程到 org.springframework.scheduling.TaskScheduler
的實例中,以便在延遲時間過後發送到輸出通道。即使對於相當長的延遲,這種方法也是可擴展的,因為它不會導致大量發送者執行緒被封鎖。相反地,在典型情況下,執行緒池會用於釋放訊息的實際執行。本節包含設定延遲器的幾個範例。
設定延遲器
<delayer>
元素用於延遲兩個訊息通道之間的訊息流程。與其他端點一樣,您可以提供 'input-channel' 和 'output-channel' 屬性,但延遲器也具有 'default-delay' 和 'expression' 屬性(以及 'expression' 元素),這些屬性決定每個訊息應延遲的毫秒數。以下範例將所有訊息延遲三秒
<int:delayer id="delayer" input-channel="input"
default-delay="3000" output-channel="output"/>
如果您需要決定每個訊息的延遲時間,您也可以使用 'expression' 屬性提供 SpEL 運算式,如下列運算式所示
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
public IntegrationFlow flow() {
return IntegrationFlow.from("input")
.delay(d -> d
.messageGroupId("delayer.messageGroupId")
.defaultDelay(3_000L)
.delayExpression("headers['delay']"))
.channel("output")
.get();
}
@Bean
fun flow() =
integrationFlow("input") {
delay {
messageGroupId("delayer.messageGroupId")
defaultDelay(3000L)
delayExpression("headers['delay']")
}
channel("output")
}
@ServiceActivator(inputChannel = "input")
@Bean
public DelayHandler delayer() {
DelayHandler handler = new DelayHandler("delayer.messageGroupId");
handler.setDefaultDelay(3_000L);
handler.setDelayExpressionString("headers['delay']");
handler.setOutputChannelName("output");
return handler;
}
<int:delayer id="delayer" input-channel="input" output-channel="output"
default-delay="3000" expression="headers['delay']"/>
在先前的範例中,三秒延遲僅在運算式針對給定的輸入訊息評估為 null 時才適用。如果您只想將延遲套用至具有有效運算式評估結果的訊息,則可以使用 0
的 'default-delay'(預設值)。對於任何延遲為 0
(或更少)的訊息,訊息會立即在呼叫執行緒上傳送。
XML 解析器使用 <beanName>.messageGroupId 的訊息群組 ID。 |
延遲處理器支援運算式評估結果,這些結果表示以毫秒為單位的時間間隔(任何 Object ,其 toString() 方法產生可以解析為 Long 的值)以及表示絕對時間的 java.util.Date 實例。在第一種情況下,毫秒數是從目前時間開始計算(例如,值 5000 會將訊息延遲至少五秒,從延遲器接收訊息的時間開始算起)。使用 Date 實例,訊息在該 Date 物件表示的時間之前不會釋放。等於非正延遲或過去日期的值不會產生延遲。相反地,它會在原始發送者的執行緒上直接傳送到輸出通道。如果運算式評估結果不是 Date 且無法解析為 Long ,則會套用預設延遲(如果有的話 — 預設值為 0 )。 |
運算式評估可能會因各種原因拋出評估例外狀況,包括無效的運算式或其他條件。依預設,這些例外狀況會被忽略(雖然會在 DEBUG 層級記錄),且延遲器會回復為預設延遲(如果有的話)。您可以透過設定 ignore-expression-failures 屬性來修改此行為。依預設,此屬性設定為 true ,且延遲器行為如先前所述。但是,如果您不希望忽略運算式評估例外狀況,並將它們拋給延遲器的呼叫者,請將 ignore-expression-failures 屬性設定為 false 。 |
在先前的範例中,延遲運算式指定為
因此,如果標頭有可能被省略,且您想要回復為預設延遲,則通常更有效率(且建議)使用索引器語法而不是點屬性存取器語法,因為偵測 null 比捕捉例外狀況更快。 |
延遲器委派給 Spring 的 TaskScheduler
抽象的實例。延遲器使用的預設排程器是 Spring Integration 在啟動時提供的 ThreadPoolTaskScheduler
實例。請參閱 設定任務排程器。如果您想要委派給不同的排程器,您可以透過延遲器元素的 'scheduler' 屬性提供參考,如下列範例所示
<int:delayer id="delayer" input-channel="input" output-channel="output"
expression="headers.delay"
scheduler="exampleTaskScheduler"/>
<task:scheduler id="exampleTaskScheduler" pool-size="3"/>
如果您設定外部 ThreadPoolTaskScheduler ,您可以在此屬性上設定 waitForTasksToCompleteOnShutdown = true 。它允許成功完成已處於執行狀態(釋放訊息)的 'delay' 任務,當應用程式關閉時。在 Spring Integration 2.2 之前,此屬性在 <delayer> 元素上可用,因為 DelayHandler 可以在背景建立自己的排程器。自 2.2 以來,延遲器需要外部排程器實例,且 waitForTasksToCompleteOnShutdown 已被刪除。您應該使用排程器自己的設定。 |
ThreadPoolTaskScheduler 具有屬性 errorHandler ,可以使用 org.springframework.util.ErrorHandler 的某些實作注入。此處理器允許處理來自排程任務執行緒的 Exception ,該執行緒傳送延遲訊息。依預設,它使用 org.springframework.scheduling.support.TaskUtils$LoggingErrorHandler ,您可以在記錄中看到堆疊追蹤。您可能會考慮使用 org.springframework.integration.channel.MessagePublishingErrorHandler ,它將 ErrorMessage 傳送到 error-channel ,從失敗訊息的標頭或預設 error-channel 。此錯誤處理是在交易回復(如果存在)後執行。請參閱 釋放失敗。 |
延遲器和訊息儲存區
DelayHandler
將延遲的訊息持久儲存到提供的 MessageStore
中的訊息群組。('groupId' 基於 <delayer>
元素的必要 'id' 屬性。另請參閱 DelayHandler.setMessageGroupId(String)
。)延遲的訊息會在排程任務立即從 MessageStore
中移除,然後 DelayHandler
將訊息傳送到 output-channel
。如果提供的 MessageStore
是持久性的(例如 JdbcMessageStore
),它提供在應用程式關閉時不遺失訊息的能力。在應用程式啟動後,DelayHandler
會從其在 MessageStore
中的訊息群組讀取訊息,並根據訊息的原始到達時間(如果延遲是數值)重新排程它們。對於延遲標頭是 Date
的訊息,重新排程時會使用該 Date
。如果延遲的訊息在 MessageStore
中保留的時間超過其 'delay',則會在啟動後立即傳送。messageGroupId
是必要的,且不能依賴可以產生的 DelayHandler
bean 名稱。這樣,在應用程式重新啟動後,DelayHandler
可能會獲得新的產生 bean 名稱。因此,延遲的訊息可能會從重新排程中遺失,因為它們的群組不再由應用程式管理。
<delayer>
可以使用以下兩個互斥元素之一來豐富:<transactional>
和 <advice-chain>
。這些 AOP advices 的 List
會套用至代理的內部 DelayHandler.ReleaseMessageHandler
,它負責在延遲後在排程任務的 Thread
上釋放訊息。例如,當下游訊息流程拋出例外狀況,且 ReleaseMessageHandler
的交易回復時,可能會使用它。在這種情況下,延遲的訊息會保留在持久性 MessageStore
中。您可以在 <advice-chain>
中使用任何自訂 org.aopalliance.aop.Advice
實作。<transactional>
元素定義一個簡單的 advice 鏈,其中僅包含交易 advice。以下範例顯示 <delayer>
內的 advice-chain
<int:delayer id="delayer" input-channel="input" output-channel="output"
expression="headers.delay"
message-store="jdbcMessageStore">
<int:advice-chain>
<beans:ref bean="customAdviceBean"/>
<tx:advice>
<tx:attributes>
<tx:method name="*" read-only="true"/>
</tx:attributes>
</tx:advice>
</int:advice-chain>
</int:delayer>
DelayHandler
可以匯出為 JMX MBean
,其中包含受管理的操作(getDelayedMessageCount
和 reschedulePersistedMessages
),這允許在執行階段重新排程延遲的持久性訊息 — 例如,如果 TaskScheduler
先前已停止。這些操作可以透過 Control Bus
命令調用,如下列範例所示
Message<String> delayerReschedulingMessage =
MessageBuilder.withPayload("@'delayer.handler'.reschedulePersistedMessages()").build();
controlBusChannel.send(delayerReschedulingMessage);
有關訊息儲存區、JMX 和控制匯流排的更多資訊,請參閱 系統管理。 |
從 5.3.7 版開始,如果交易在將訊息儲存到 MessageStore
時處於活動狀態,則釋放任務會在 TransactionSynchronization.afterCommit()
回呼中排程。這對於防止競爭條件是必要的,在競爭條件下,排程的釋放可能會在交易提交之前執行,且找不到訊息。在這種情況下,訊息將在延遲後或在交易提交後釋放,以較晚者為準。
釋放失敗
從 5.0.8 版開始,延遲器上有兩個新的屬性
-
maxAttempts
(預設值 5) -
retryDelay
(預設值 1 秒)
當訊息被釋放時,如果下游流程失敗,則會在 retryDelay
後嘗試釋放。如果達到 maxAttempts
,則訊息會被捨棄(除非釋放是交易性的,在這種情況下,訊息將保留在儲存區中,但將不再排程釋放,直到應用程式重新啟動,或調用 reschedulePersistedMessages()
方法,如上所述)。
此外,您可以設定 delayedMessageErrorChannel
;當釋放失敗時,ErrorMessage
會傳送到該通道,其中例外狀況作為 payload,並具有 originalMessage
屬性。ErrorMessage
包含標頭 IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT
,其中包含目前計數。
如果錯誤流程消耗了錯誤訊息並正常結束,則不會採取進一步的動作;如果釋放是交易性的,則交易將會提交,且訊息會從儲存區中刪除。如果錯誤流程拋出例外狀況,則將重試釋放,直到達到 maxAttempts
,如上所述。