輪詢器

本節說明輪詢在 Spring Integration 中的運作方式。

輪詢消費者

當訊息端點 (通道配接器) 連接到通道並實例化時,它們會產生下列其中一個實例

實際實作取決於這些端點連接的通道類型。連接到實作 org.springframework.messaging.SubscribableChannel 介面的通道的通道配接器會產生 EventDrivenConsumer 的實例。另一方面,連接到實作 org.springframework.messaging.PollableChannel 介面(例如 QueueChannel)的通道的通道配接器會產生 PollingConsumer 的實例。

輪詢消費者讓 Spring Integration 元件主動輪詢訊息,而不是以事件驅動的方式處理訊息。

它們代表許多訊息傳遞場景中的關鍵跨領域問題。在 Spring Integration 中,輪詢消費者基於同名的模式,該模式在 Gregor Hohpe 和 Bobby Woolf 的著作《企業整合模式》中有所描述。您可以在該書的網站上找到該模式的描述。

有關輪詢消費者設定的更多資訊,請參閱訊息端點

可輪詢訊息來源

Spring Integration 提供了輪詢消費者模式的第二種變體。當使用輸入通道配接器時,這些配接器通常會被 SourcePollingChannelAdapter 包裝。例如,當從遠端 FTP 伺服器位置檢索訊息時,FTP 輸入通道配接器中描述的配接器配置了一個輪詢器,以定期檢索訊息。因此,當元件配置了輪詢器時,產生的實例屬於下列類型之一

這表示輪詢器用於輸入和輸出訊息傳遞場景。以下是一些使用輪詢器的用例

  • 輪詢某些外部系統,例如 FTP 伺服器、資料庫和 Web 服務

  • 輪詢內部(可輪詢)訊息通道

  • 輪詢內部服務(例如重複執行 Java 類別上的方法)

AOP advice 類別可以應用於輪詢器,在 advice-chain 中,例如啟動交易的交易 advice。從 4.1 版開始,提供了 PollSkipAdvice。輪詢器使用觸發器來決定下一次輪詢的時間。PollSkipAdvice 可以用於抑制(跳過)輪詢,可能是因為存在一些下游條件會阻止訊息被處理。要使用此 advice,您必須為其提供 PollSkipStrategy 的實作。從 4.2.5 版開始,提供了 SimplePollSkipStrategy。要使用它,您可以將實例作為 bean 新增到應用程式內容中,將其注入到 PollSkipAdvice 中,並將其新增到輪詢器的 advice 鏈中。要跳過輪詢,請呼叫 skipPolls()。要恢復輪詢,請呼叫 reset()。4.2 版在此領域增加了更多彈性。請參閱條件輪詢器

本章旨在僅提供輪詢消費者的高階概述,以及它們如何融入訊息通道(請參閱訊息通道)和通道配接器(請參閱通道配接器)的概念。有關訊息端點一般和輪詢消費者特別的更多資訊,請參閱訊息端點

延遲確認可輪詢訊息來源

從 5.0.1 版開始,某些模組提供了 MessageSource 實作,支援延遲確認,直到下游流程完成(或將訊息交給另一個執行緒)。目前僅限於 AmqpMessageSourceKafkaMessageSource

使用這些訊息來源,IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 標頭(請參閱MessageHeaderAccessor API)會新增到訊息中。當與可輪詢訊息來源一起使用時,標頭的值是 AcknowledgmentCallback 的實例,如下列範例所示

@FunctionalInterface
public interface AcknowledgmentCallback {

    void acknowledge(Status status);

    boolean isAcknowledged();

    void noAutoAck();

    default boolean isAutoAck();

    enum Status {

        /**
         * Mark the message as accepted.
         */
        ACCEPT,

        /**
         * Mark the message as rejected.
         */
        REJECT,

        /**
         * Reject the message and requeue so that it will be redelivered.
         */
        REQUEUE

    }

}

並非所有訊息來源(例如,KafkaMessageSource)都支援 REJECT 狀態。它被視為與 ACCEPT 相同。

應用程式可以隨時確認訊息,如下列範例所示

Message<?> received = source.receive();

...

StaticMessageHeaderAccessor.getAcknowledgmentCallback(received)
        .acknowledge(Status.ACCEPT);

如果 MessageSource 連接到 SourcePollingChannelAdapter,當輪詢器執行緒在下游流程完成後返回配接器時,配接器會檢查是否已確認確認,如果沒有,則將其狀態設定為 ACCEPT(如果流程拋出異常,則為 REJECT)。狀態值在 AcknowledgmentCallback.Status 列舉中定義。

Spring Integration 提供了 MessageSourcePollingTemplate 來執行 MessageSource 的臨時輪詢。這也負責在 MessageHandler 回呼返回(或拋出異常)時,在 AcknowledgmentCallback 上設定 ACCEPTREJECT。下列範例顯示如何使用 MessageSourcePollingTemplate 進行輪詢

MessageSourcePollingTemplate template =
    new MessageSourcePollingTemplate(this.source);
template.poll(h -> {
    ...
});

在這兩種情況下(SourcePollingChannelAdapterMessageSourcePollingTemplate),您可以透過在回呼上呼叫 noAutoAck() 來停用自動 ack/nack。如果您將訊息交給另一個執行緒並希望稍後確認,您可以這樣做。並非所有實作都支援此功能(例如,Apache Kafka 不支援,因為偏移量提交必須在同一個執行緒上執行)。

訊息來源的條件輪詢器

本節介紹如何使用條件輪詢器。

背景

Advice 物件,在輪詢器的 advice-chain 中,會建議整個輪詢任務(包括訊息檢索和處理)。這些「環繞 advice」方法無法存取輪詢的任何內容,只能存取輪詢本身。對於諸如使任務具有交易性或由於某些外部條件而跳過輪詢之類的需求,這很好,如前所述。如果我們希望根據輪詢的接收部分的結果採取某些操作,或者如果我們想根據條件調整輪詢器怎麼辦?對於這些情況,Spring Integration 提供了「智慧型」輪詢。

「智慧型」輪詢

5.3 版引入了 ReceiveMessageAdvice 介面。advice-chain 中實作此介面的任何 Advice 物件僅應用於 receive() 操作 - MessageSource.receive()PollableChannel.receive(timeout)。因此,它們只能應用於 SourcePollingChannelAdapterPollingConsumer。此類別實作下列方法

  • beforeReceive(Object source) 此方法在 Object.receive() 方法之前呼叫。它讓您可以檢查和重新設定來源。傳回 false 會取消此輪詢(類似於前面提到的 PollSkipAdvice)。

  • Message<?> afterReceive(Message<?> result, Object source) 此方法在 receive() 方法之後呼叫。同樣,您可以重新設定來源或採取任何動作(可能取決於結果,如果來源未建立訊息,則結果可能為 null)。您甚至可以傳回不同的訊息

執行緒安全

如果 Advice 修改了來源,則不應使用 TaskExecutor 設定輪詢器。如果 Advice 修改了來源,則此類修改不是執行緒安全的,並且可能會導致意外結果,尤其是在高頻率輪詢器中。如果您需要同時處理輪詢結果,請考慮使用下游 ExecutorChannel 而不是向輪詢器新增執行器。

Advice 鏈排序

您應該了解在初始化期間如何處理 advice 鏈。未實作 ReceiveMessageAdviceAdvice 物件會應用於整個輪詢流程,並且在任何 ReceiveMessageAdvice 之前,依序首先全部調用。然後,ReceiveMessageAdvice 物件會在來源 receive() 方法周圍依序調用。例如,如果您有 Advice 物件 a、b、c、d,其中 b 和 d 是 ReceiveMessageAdvice,則物件會依下列順序應用:a、c、b、d。此外,如果來源已經是 Proxy,則 ReceiveMessageAdvice 會在任何現有的 Advice 物件之後調用。如果您希望變更順序,則必須自行連接 Proxy

SimpleActiveIdleReceiveMessageAdvice

此 advice 是 ReceiveMessageAdvice 的簡單實作。當與 DynamicPeriodicTrigger 結合使用時,它會調整輪詢頻率,具體取決於先前的輪詢是否產生訊息。輪詢器也必須參考相同的 DynamicPeriodicTrigger

重要事項:非同步交接
SimpleActiveIdleReceiveMessageAdvice 會根據 receive() 結果修改觸發器。這僅在 advice 在輪詢器執行緒上呼叫時才有效。如果輪詢器有 task-executor,則無效。若要在您希望在輪詢結果之後使用非同步操作的情況下使用此 advice,請稍後執行非同步交接,例如使用 ExecutorChannel

CompoundTriggerAdvice

此 advice 允許根據輪詢是否傳回訊息來選擇兩個觸發器之一。考慮一個使用 CronTrigger 的輪詢器。CronTrigger 實例是不可變的,因此一旦建構就無法變更。考慮一個用例,我們想要使用 cron 運算式每小時觸發一次輪詢,但如果未收到任何訊息,則每分鐘輪詢一次,並且當檢索到訊息時,還原為使用 cron 運算式。

此 advice(和輪詢器)為此目的使用 CompoundTrigger。觸發器的主要觸發器可以是 CronTrigger。當 advice 偵測到未收到任何訊息時,它會將次要觸發器新增至 CompoundTrigger。當調用 CompoundTrigger 實例的 nextExecutionTime 方法時,如果存在次要觸發器,則會委派給次要觸發器。否則,它會委派給主要觸發器。

輪詢器也必須參考相同的 CompoundTrigger

下列範例顯示每小時 cron 運算式的設定,並回退到每分鐘

<int:inbound-channel-adapter channel="nullChannel" auto-startup="false">
    <bean class="org.springframework.integration.endpoint.PollerAdviceTests.Source" />
    <int:poller trigger="compoundTrigger">
        <int:advice-chain>
            <bean class="org.springframework.integration.aop.CompoundTriggerAdvice">
                <constructor-arg ref="compoundTrigger"/>
                <constructor-arg ref="secondary"/>
            </bean>
        </int:advice-chain>
    </int:poller>
</int:inbound-channel-adapter>

<bean id="compoundTrigger" class="org.springframework.integration.util.CompoundTrigger">
    <constructor-arg ref="primary" />
</bean>

<bean id="primary" class="org.springframework.scheduling.support.CronTrigger">
    <constructor-arg value="0 0 * * * *" /> <!-- top of every hour -->
</bean>

<bean id="secondary" class="org.springframework.scheduling.support.PeriodicTrigger">
    <constructor-arg value="60000" />
</bean>
重要事項:非同步交接
CompoundTriggerAdvice 會根據 receive() 結果修改觸發器。這僅在 advice 在輪詢器執行緒上呼叫時才有效。如果輪詢器有 task-executor,則無效。若要在您希望在輪詢結果之後使用非同步操作的情況下使用此 advice,請稍後執行非同步交接,例如使用 ExecutorChannel

僅 MessageSource Advice

某些 advice 可能僅適用於 MessageSource.receive(),而對於 PollableChannel 沒有意義。為此目的,MessageSourceMutator 介面(ReceiveMessageAdvice 的延伸)仍然存在。有關更多資訊,請參閱輸入通道配接器:輪詢多個伺服器和目錄