訊息通道實作

Spring Integration 提供了不同的訊息通道實作。以下章節簡要說明每個實作。

PublishSubscribeChannel

PublishSubscribeChannel 實作會將傳送給它的任何 Message 廣播給其所有訂閱的處理常式。這最常用於傳送事件訊息,其主要作用是通知(相對於文件訊息,文件訊息通常旨在由單一處理常式處理)。請注意,PublishSubscribeChannel 僅用於傳送。由於它會在呼叫其 send(Message) 方法時直接廣播給其訂閱者,因此消費者無法輪詢訊息(它未實作 PollableChannel,因此沒有 receive() 方法)。相反地,任何訂閱者本身都必須是 MessageHandler,並且依序呼叫訂閱者的 handleMessage(Message) 方法。

在 3.0 版之前,在沒有訂閱者的 PublishSubscribeChannel 上叫用 send 方法會傳回 false。當與 MessagingTemplate 結合使用時,會擲回 MessageDeliveryException。從 3.0 版開始,行為已變更,使得如果至少存在最少數量的訂閱者(且成功處理訊息),則一律將 send 視為成功。可以透過設定 minSubscribers 屬性來修改此行為,該屬性預設為 0

如果您使用 TaskExecutor,則僅訂閱者的正確數量存在才會用於此判斷,因為訊息的實際處理是以非同步方式執行。

QueueChannel

QueueChannel 實作封裝了佇列。與 PublishSubscribeChannel 不同,QueueChannel 具有點對點語意。換句話說,即使通道有多個消費者,也只有其中一個消費者應接收傳送到該通道的任何 Message。它提供了預設的無引數建構子(提供基本上無限大的容量 Integer.MAX_VALUE)以及接受佇列容量的建構子,如下列清單所示

public QueueChannel(int capacity)

尚未達到容量限制的通道會將訊息儲存在其內部佇列中,並且即使沒有接收者準備好處理訊息,send(Message<?>) 方法也會立即傳回。如果佇列已達到容量,則傳送者會封鎖,直到佇列中有可用空間為止。或者,如果您使用具有額外逾時參數的 send 方法,則佇列會封鎖,直到有可用空間或逾時期間經過(以先發生者為準)。同樣地,如果佇列中有訊息可用,則 receive() 呼叫會立即傳回,但是,如果佇列為空,則 receive 呼叫可能會封鎖,直到訊息可用或逾時(如果已提供)經過為止。在任一情況下,都可以強制立即傳回,而與佇列的狀態無關,方法是傳遞逾時值 0。但是請注意,對沒有 timeout 參數的 send()receive() 版本的呼叫會無限期地封鎖。

PriorityChannel

雖然 QueueChannel 強制執行先進先出 (FIFO) 排序,但 PriorityChannel 是一種替代實作,允許根據優先順序在通道內排序訊息。預設情況下,優先順序由每個訊息中的 priority 標頭決定。但是,對於自訂優先順序決定邏輯,可以將 Comparator<Message<?>> 類型的比較器提供給 PriorityChannel 建構子。

RendezvousChannel

RendezvousChannel 啟用了「直接交接」情境,其中傳送者會封鎖,直到另一方叫用通道的 receive() 方法。另一方會封鎖,直到傳送者傳送訊息為止。在內部,此實作與 QueueChannel 非常相似,不同之處在於它使用 SynchronousQueueBlockingQueue 的零容量實作)。這在傳送者和接收者在不同執行緒中運作,但以非同步方式將訊息放入佇列不適當的情況下效果很好。換句話說,使用 RendezvousChannel,傳送者知道某些接收者已接受訊息,而使用 QueueChannel,訊息將已儲存到內部佇列中,並且可能永遠不會被接收。

請記住,預設情況下,所有這些基於佇列的通道都僅在記憶體中儲存訊息。當需要持久性時,您可以在 'queue' 元素中提供 'message-store' 屬性以參考持久性 MessageStore 實作,或者您可以將本機通道替換為由持久性代理程式支援的通道,例如 JMS 支援的通道或通道配接器。後一個選項可讓您利用任何 JMS 供應商的訊息持久性實作,如 JMS 支援中所述。但是,當不需要在佇列中緩衝時,最簡單的方法是依賴 DirectChannel,這將在下一節中討論。

RendezvousChannel 也適用於實作請求-回覆作業。傳送者可以建立 RendezvousChannel 的暫時性匿名執行個體,然後在建置 Message 時將其設定為 'replyChannel' 標頭。在傳送該 Message 之後,傳送者可以立即呼叫 receive(選擇性地提供逾時值),以便在等待回覆 Message 時封鎖。這與 Spring Integration 的許多請求-回覆元件在內部使用的實作非常相似。

DirectChannel

DirectChannel 具有點對點語意,但在其他方面更類似於 PublishSubscribeChannel,而不是先前描述的任何基於佇列的通道實作。它實作 SubscribableChannel 介面而不是 PollableChannel 介面,因此它會將訊息直接分派給訂閱者。但是,作為點對點通道,它與 PublishSubscribeChannel 的不同之處在於,它會將每個 Message 傳送到單一訂閱的 MessageHandler

除了是最簡單的點對點通道選項之外,其最重要的功能之一是它允許單一執行緒在通道的「兩側」執行作業。例如,如果處理常式訂閱 DirectChannel,則將 Message 傳送到該通道會直接在傳送者的執行緒中觸發該處理常式的 handleMessage(Message) 方法叫用,然後才能傳回 send() 方法叫用。

提供具有此行為的通道實作的主要動機是支援必須跨越通道的交易,同時仍然受益於通道提供的抽象化和鬆散耦合。如果在交易範圍內叫用 send() 呼叫,則處理常式叫用的結果(例如,更新資料庫記錄)會在決定該交易的最終結果(提交或回復)中發揮作用。

由於 DirectChannel 是最簡單的選項,並且不會增加輪詢器的執行緒排程和管理所需的任何額外負擔,因此它是 Spring Integration 中的預設通道類型。一般概念是定義應用程式的通道,考慮哪些通道需要提供緩衝或節流輸入,並將這些通道修改為基於佇列的 PollableChannels。同樣地,如果通道需要廣播訊息,則它不應是 DirectChannel,而應是 PublishSubscribeChannel。稍後,我們將展示如何設定這些通道中的每一個。

DirectChannel 在內部委派給訊息分派器以叫用其訂閱的訊息處理常式,並且該分派器可以具有由 load-balancerload-balancer-ref 屬性(互斥)公開的負載平衡策略。訊息分派器使用負載平衡策略來協助判斷當多個訊息處理常式訂閱同一個通道時,訊息如何在訊息處理常式之間分配。為了方便起見,load-balancer 屬性公開了指向 LoadBalancingStrategy 的預先存在實作的列舉值。round-robin(在處理常式之間輪流進行負載平衡)和 none(對於想要明確停用負載平衡的情況)是唯一可用的值。未來版本中可能會新增其他策略實作。但是,從 3.0 版開始,您可以提供您自己的 LoadBalancingStrategy 實作,並透過使用 load-balancer-ref 屬性注入它,該屬性應指向實作 LoadBalancingStrategy 的 Bean,如下列範例所示

FixedSubscriberChannel 是一個 SubscribableChannel,僅支援無法取消訂閱的單一 MessageHandler 訂閱者。這適用於不需要其他訂閱者且不需要通道攔截器的高輸送量效能用例。

<int:channel id="lbRefChannel">
  <int:dispatcher load-balancer-ref="lb"/>
</int:channel>

<bean id="lb" class="foo.bar.SampleLoadBalancingStrategy"/>

請注意,load-balancerload-balancer-ref 屬性是互斥的。

負載平衡也與布林值 failover 屬性結合使用。如果 failover 值為 true(預設值),則當先前的處理常式擲回例外狀況時,分派器會回復為任何後續處理常式(在必要時)。順序由處理常式本身上定義的選用順序值決定,如果不存在此類值,則順序由處理常式訂閱的順序決定。

如果在特定情況下,分派器始終嘗試叫用第一個處理常式,然後在每次發生錯誤時以相同的固定順序序列回復,則不應提供負載平衡策略。換句話說,即使未啟用負載平衡,分派器仍然支援 failover 布林值屬性。但是,在沒有負載平衡的情況下,處理常式的叫用始終從第一個開始,根據其順序。例如,當主要、次要、第三等等有明確定義時,此方法效果很好。使用命名空間支援時,任何端點上的 order 屬性都會決定順序。

請記住,負載平衡和 failover 僅在通道有多個訂閱的訊息處理常式時才適用。使用命名空間支援時,這表示多個端點共用在 input-channel 屬性中定義的同一個通道參考。

從 5.2 版開始,當 failover 為 true 時,目前的處理常式的失敗以及失敗的訊息會記錄在 debug 下,如果已分別設定,則記錄在 info 下。

ExecutorChannel

ExecutorChannel 是一個點對點通道,支援與 DirectChannel 相同的分派器設定(負載平衡策略和 failover 布林值屬性)。這兩種分派通道類型之間的主要區別在於,ExecutorChannel 委派給 TaskExecutor 的執行個體以執行分派。這表示 send 方法通常不會封鎖,但也表示處理常式叫用可能不會在傳送者的執行緒中發生。因此,它不支援跨越傳送者和接收處理常式的交易。

傳送者有時可能會封鎖。例如,當使用具有節流用戶端拒絕策略的 TaskExecutor(例如 ThreadPoolExecutor.CallerRunsPolicy)時,每當執行緒集區達到其最大容量且執行器的工作佇列已滿時,傳送者的執行緒都可以執行該方法。由於這種情況只會以不可預測的方式發生,因此您不應依賴它來進行交易。

PartitionedChannel

從 6.1 版開始,提供了 PartitionedChannel 實作。這是 AbstractExecutorChannel 的擴充功能,表示點對點分派邏輯,其中實際消耗是在特定執行緒上處理的,該執行緒由從傳送到此通道的訊息評估的分割區金鑰決定。此通道類似於上述的 ExecutorChannel,但不同之處在於具有相同分割區金鑰的訊息始終在同一個執行緒中處理,從而保留排序。它不需要外部 TaskExecutor,但可以使用自訂 ThreadFactory 進行設定(例如 Thread.ofVirtual().name("partition-", 0).factory())。此工廠用於將單一執行緒執行器填入每個分割區的 MessageDispatcher 委派中。預設情況下,IntegrationMessageHeaderAccessor.CORRELATION_ID 訊息標頭用作分割區金鑰。此通道可以設定為簡單的 Bean

@Bean
PartitionedChannel somePartitionedChannel() {
    return new PartitionedChannel(3, (message) -> message.getHeaders().get("partitionKey"));
}

該通道將具有 3 個分割區 - 專用執行緒;將使用 partitionKey 標頭來決定訊息將在哪個分割區中處理。請參閱 PartitionedChannel 類別 Javadocs 以取得更多資訊。

FluxMessageChannel

FluxMessageChannelorg.reactivestreams.Publisher 實作,用於將傳送的訊息 "sinking" 到內部 reactor.core.publisher.Flux 中,以供下游反應式訂閱者按需使用。此通道實作既不是 SubscribableChannel,也不是 PollableChannel,因此只有 org.reactivestreams.Subscriber 執行個體可以用於從此通道取用,從而遵守反應式串流的回壓特性。另一方面,FluxMessageChannel 實作了 ReactiveStreamsSubscribableChannel 及其 subscribeTo(Publisher<Message<?>>) 合約,允許從反應式來源發布者接收事件,從而將反應式串流橋接到整合流程中。為了實現整個整合流程的完全反應式行為,此類通道必須放置在流程中的所有端點之間。

請參閱 反應式串流支援,以取得有關與反應式串流互動的更多資訊。

範圍通道

Spring Integration 1.0 提供了 ThreadLocalChannel 實作,但已從 2.0 開始移除。現在,處理相同需求的更通用方法是將 scope 屬性新增至通道。屬性的值可以是內容中可用的範圍的名稱。例如,在 Web 環境中,某些範圍可用,並且任何自訂範圍實作都可以在內容中註冊。下列範例顯示了應用於通道的執行緒本機範圍,包括範圍本身的註冊

<int:channel id="threadScopedChannel" scope="thread">
     <int:queue />
</int:channel>

<bean class="org.springframework.beans.factory.config.CustomScopeConfigurer">
    <property name="scopes">
        <map>
            <entry key="thread" value="org.springframework.context.support.SimpleThreadScope" />
        </map>
    </property>
</bean>

先前範例中定義的通道也在內部委派給佇列,但通道繫結到目前的執行緒,因此佇列的內容也類似地繫結。這樣,傳送到通道的執行緒稍後可以接收相同的訊息,但沒有其他執行緒可以存取它們。雖然很少需要執行緒範圍的通道,但在使用 DirectChannel 執行個體來強制執行單一執行緒作業,但任何回覆訊息都應傳送到「終端」通道的情況下,它們可能很有用。如果該終端通道是執行緒範圍的,則原始傳送執行緒可以從終端通道收集其回覆。

現在,由於任何通道都可以範圍化,因此除了執行緒本機之外,您還可以定義自己的範圍。