訊息監聽器容器組態

針對與交易和服務品質相關的 SimpleMessageListenerContainer (SMLC) 和 DirectMessageListenerContainer (DMLC) 的組態,以及它們之間互動的方式,有相當多的選項。適用於 SMLC、DMLC 或 StreamListenerContainer (StLC) 的屬性 (請參閱 使用 RabbitMQ Stream 外掛程式) 在適當的欄位中以核取記號表示。請參閱 選擇容器 以取得資訊,協助您判斷哪個容器適合您的應用程式。

下表顯示容器屬性名稱以及使用命名空間組態 <rabbit:listener-container/> 時的等效屬性名稱 (在括號中)。該元素上的 type 屬性可以是 simple (預設) 或 direct,以分別指定 SMLCDMLC。某些屬性未透過命名空間公開。這些屬性以 N/A 表示。

表 1. 訊息監聽器容器的組態選項
屬性 (Attribute) 描述 SMLC DMLC StLC

ackTimeout
(N/A)

當設定 messagesPerAck 時,此逾時時間會用作傳送 ack 的替代方案。當新訊息抵達時,會將未 ack 訊息的計數與 messagesPerAck 進行比較,並將自上次 ack 以來的時間與此值進行比較。如果任一條件為 true,則會確認訊息。當沒有新訊息抵達且存在未 ack 訊息時,此逾時時間是近似值,因為僅在每個 monitorInterval 檢查條件。另請參閱此表中的 messagesPerAckmonitorInterval

tickmark

acknowledgeMode
(acknowledge)

  • NONE:不傳送 ack (與 channelTransacted=true 不相容)。RabbitMQ 將此稱為「autoack」,因為 Broker 假設所有訊息都已 ack,而無需消費者採取任何動作。

  • MANUAL:監聽器必須透過呼叫 Channel.basicAck() 來確認所有訊息。

  • AUTO:容器會自動確認訊息,除非 MessageListener 擲回例外狀況。請注意,acknowledgeModechannelTransacted 相輔相成 — 如果通道是交易式的,則 Broker 除了 ack 之外,還需要提交通知。這是預設模式。另請參閱 batchSize

tickmark
tickmark

adviceChain
(advice-chain)

要套用至監聽器執行的 AOP Advice 陣列。這可用於套用額外的橫切關注點,例如在 Broker 故障時自動重試。請注意,CachingConnectionFactory 會處理 AMQP 錯誤後的簡單重新連線,只要 Broker 仍然存活即可。

tickmark
tickmark

afterReceivePostProcessors
(N/A)

在叫用監聽器之前叫用的 MessagePostProcessor 實例陣列。後處理器可以實作 PriorityOrderedOrdered。陣列會排序,未排序的成員最後叫用。如果後處理器傳回 null,則會捨棄訊息 (並在適當時確認)。

tickmark
tickmark

alwaysRequeueWithTxManagerRollback
(N/A)

設定為 true 以在設定交易管理員時,永遠在回滾時重新佇列訊息。

tickmark
tickmark

autoDeclare
(auto-declare)

當設定為 true (預設值) 時,容器會使用 RabbitAdmin 重新宣告所有 AMQP 物件 (佇列、交換器、繫結),如果它偵測到至少有一個佇列在啟動期間遺失,可能是因為它是 auto-delete 或過期的佇列,但如果佇列因任何原因遺失,則會繼續重新宣告。若要停用此行為,請將此屬性設定為 false。請注意,如果所有佇列都遺失,則容器無法啟動。

在 1.6 版之前,如果內容中有多個管理員,容器會隨機選取一個。如果沒有管理員,則會在內部建立一個。在任何一種情況下,都可能導致非預期的結果。從 1.6 版開始,為了讓 autoDeclare 運作,內容中必須正好有一個 RabbitAdmin,或者必須使用 rabbitAdmin 屬性在容器上組態對特定實例的參考。
tickmark
tickmark

autoStartup
(auto-startup)

旗標,指示容器是否應在 ApplicationContext 執行時啟動 (作為 SmartLifecycle 回呼的一部分,這會在所有 Bean 初始化完成後發生)。預設值為 true,但如果您的 Broker 可能在啟動時無法使用,您可以將其設定為 false,並在您知道 Broker 已準備就緒時稍後手動呼叫 start()

tickmark
tickmark
tickmark

batchSize
(transaction-size) (batch-size)

當與設定為 AUTOacknowledgeMode 搭配使用時,容器會嘗試在傳送 ack 之前處理最多此數量的訊息 (等待每個訊息直到接收逾時設定)。這也是交易通道提交的時間。如果 prefetchCount 小於 batchSize,則會增加以符合 batchSize

tickmark

batchingStrategy
(N/A)

在 debatchng 訊息時使用的策略。預設值為 SimpleDebatchingStrategy。請參閱 批次處理搭配批次的 @RabbitListener

tickmark
tickmark

channelTransacted
(channel-transacted)

布林旗標,表示所有訊息都應在交易中確認 (手動或自動)。

tickmark
tickmark

concurrency
(N/A)

m-n 每個監聽器的並行消費者範圍 (最小值、最大值)。如果僅提供 n,則 n 是固定數量的消費者。請參閱 監聽器並行

tickmark

concurrentConsumers
(concurrency)

每個監聽器最初啟動的並行消費者數量。請參閱 監聽器並行。對於 StLC,並行性是透過多載的 superStream 方法控制;請參閱 使用單一作用中消費者取用 Super Streams

tickmark
tickmark

connectionFactory
(connection-factory)

ConnectionFactory 的參考。當使用 XML 命名空間進行組態時,預設參考的 Bean 名稱是 rabbitConnectionFactory

tickmark
tickmark

consecutiveActiveTrigger
(min-consecutive-active)

消費者連續接收訊息的最小數量,而未發生接收逾時,在考慮啟動新消費者時。也受 'batchSize' 的影響。請參閱 監聽器並行。預設值:10。

tickmark

consecutiveIdleTrigger
(min-consecutive-idle)

消費者必須經歷的最小接收逾時次數,然後才考慮停止消費者。也受 'batchSize' 的影響。請參閱 監聽器並行。預設值:10。

tickmark

consumerBatchEnabled
(batch-enabled)

如果 MessageListener 支援,將此設定為 true 會啟用離散訊息的批次處理,最多為 batchSize;如果在 receiveTimeout 中沒有新訊息抵達,或收集批次訊息的時間超過 batchReceiveTimeout,則會傳遞部分批次。當此值為 false 時,批次處理僅支援由生產者建立的批次;請參閱 批次處理

tickmark

consumerCustomizer
(N/A)

用於修改容器建立的 Stream 消費者的 ConsumerCustomizer Bean。

tickmark

consumerStartTimeout
(N/A)

等待消費者執行緒啟動的時間 (以毫秒為單位)。如果此時間過去,則會寫入錯誤記錄。可能發生這種情況的一個範例是,如果組態的 taskExecutor 沒有足夠的執行緒來支援容器 concurrentConsumers

請參閱 執行緒與非同步消費者。預設值:60000 (一分鐘)。

tickmark

consumerTagStrategy
(consumer-tag-strategy)

設定 ConsumerTagStrategy 的實作,啟用為每個消費者建立 (唯一) 標籤。

tickmark
tickmark

consumersPerQueue
(consumers-per-queue)

為每個組態的佇列建立的消費者數量。請參閱 監聽器並行

tickmark

consumeDelay
(N/A)

當使用 RabbitMQ Sharding 外掛程式concurrentConsumers > 1 時,存在競爭條件,可能會阻止消費者在分片之間均勻分配。使用此屬性可在消費者啟動之間新增少量延遲,以避免此競爭條件。您應該實驗值以判斷適合您環境的延遲。

tickmark
tickmark

debatchingEnabled
(N/A)

當為 true 時,監聽器容器將 debatch 批次訊息,並使用批次中的每個訊息叫用監聽器。從 2.2.7 版開始,如果監聽器是 BatchMessageListenerChannelAwareBatchMessageListener生產者建立的批次 將 debatch 為 List<Message>。否則,批次中的訊息會一次呈現一個。預設值為 true。請參閱 批次處理搭配批次的 @RabbitListener

tickmark
tickmark

declarationRetries
(declaration-retries)

被動佇列宣告失敗時的重試次數。被動佇列宣告發生在消費者啟動時,或從多個佇列取用時,並非所有佇列在初始化期間都可用時。當在重試耗盡後,仍無法被動宣告任何組態的佇列時 (無論任何原因),容器行為由先前描述的 'missingQueuesFatal` 屬性控制。預設值:三次重試 (總共四次嘗試)。

tickmark

defaultRequeueRejected
(requeue-rejected)

決定是否應重新佇列因監聽器擲回例外狀況而被拒絕的訊息。預設值:true

tickmark
tickmark

errorHandler
(error-handler)

ErrorHandler 策略的參考,用於處理在 MessageListener 執行期間可能發生的任何未捕獲的例外狀況。預設值:ConditionalRejectingErrorHandler

tickmark
tickmark

exclusive
(exclusive)

決定此容器中的單一消費者是否具有佇列的獨佔存取權。當此值為 true 時,容器的並行性必須為 1。如果另一個消費者具有獨佔存取權,則容器會根據 recovery-intervalrecovery-back-off 嘗試恢復消費者。當使用命名空間時,此屬性會與佇列名稱一起出現在 <rabbit:listener/> 元素上。預設值:false

tickmark
tickmark

exclusiveConsumerExceptionLogger
(N/A)

當獨佔消費者無法取得佇列的存取權時使用的例外狀況記錄器。預設情況下,此記錄會記錄在 WARN 層級。

tickmark
tickmark

failedDeclarationRetryInterval
(failed-declaration -retry-interval)

被動佇列宣告重試嘗試之間的間隔。被動佇列宣告發生在消費者啟動時,或從多個佇列取用時,並非所有佇列在初始化期間都可用時。預設值:5000 (五秒)。

tickmark
tickmark

forceCloseChannel
(N/A)

如果消費者在 shutdownTimeout 內未回應關閉,如果此值為 true,則會關閉通道,導致任何未 ack 的訊息重新佇列。自 2.0 版起預設為 true。您可以將其設定為 false 以還原為先前的行為。

tickmark
tickmark

forceStop
(N/A)

設定為 true 以在處理目前記錄後停止 (當容器停止時);導致所有預取訊息重新佇列。依預設,容器將取消消費者並在停止之前處理所有預取訊息。自 2.4.14 版、3.0.6 版起預設為 false

tickmark
tickmark

globalQos
(global-qos)

當為 true 時,prefetchCount 會全域套用至通道,而不是套用至通道上的每個消費者。如需詳細資訊,請參閱 basicQos.global

tickmark
tickmark

(group)

僅在使用命名空間時可用。指定時,會註冊類型為 Collection<MessageListenerContainer> 且具有此名稱的 Bean,並且每個 <listener/> 元素的容器都會新增至集合中。這允許例如透過迭代集合來啟動和停止容器群組。如果多個 <listener-container/> 元素具有相同群組值,則集合中的容器會形成所有如此指定的容器的聚合。

tickmark
tickmark

idleEventInterval
(idle-event-interval)

請參閱 偵測閒置的非同步消費者

tickmark
tickmark

javaLangErrorHandler
(N/A)

當容器執行緒捕獲 Error 時呼叫的 AbstractMessageListenerContainer.JavaLangErrorHandler 實作。預設實作呼叫 System.exit(99);若要還原為先前的行為 (不執行任何動作),請新增無作業處理常式。

tickmark
tickmark

maxConcurrentConsumers
(max-concurrency)

如果需要,依需求啟動的最大並行消費者數量。必須大於或等於 'concurrentConsumers'。請參閱 監聽器並行

tickmark

messagesPerAck
(N/A)

在 ack 之間接收的訊息數量。使用此選項可減少傳送至 Broker 的 ack 數量 (以增加重新傳遞訊息的可能性為代價)。一般而言,您應該僅在高容量監聽器容器上設定此屬性。如果設定此選項且訊息被拒絕 (擲回例外狀況),則會確認擱置中的 ack,並拒絕失敗的訊息。不允許使用交易通道。如果 prefetchCount 小於 messagesPerAck,則會增加以符合 messagesPerAck。預設值:確認每個訊息。另請參閱此表中的 ackTimeout

tickmark

mismatchedQueuesFatal
(mismatched-queues-fatal)

當容器啟動時,如果此屬性為 true (預設值:false),則容器會檢查內容中宣告的所有佇列是否與 Broker 上已有的佇列相容。如果存在不符的屬性 (例如 auto-delete) 或引數 (例如 x-message-ttl),則容器 (和應用程式內容) 無法啟動並發生嚴重例外狀況。

如果在恢復期間偵測到問題 (例如,在連線遺失之後),則會停止容器。

應用程式內容中必須有單一 RabbitAdmin (或使用 rabbitAdmin 屬性在容器上特別組態一個)。否則,此屬性必須為 false

如果 Broker 在初始啟動期間不可用,則容器會啟動,並且在建立連線時檢查條件。
檢查是針對內容中的所有佇列完成的,而不僅僅是特定監聽器組態為使用的佇列。如果您希望將檢查限制為僅限於容器使用的那些佇列,則應為容器組態個別的 RabbitAdmin,並使用 rabbitAdmin 屬性提供對它的參考。如需詳細資訊,請參閱 條件式宣告
在啟動標記為 @Lazy 的 Bean 中 @RabbitListener 的容器時,會停用不符的佇列引數偵測。這是為了避免可能導致延遲此類容器啟動長達 60 秒的潛在死鎖。使用 Lazy 監聽器 Bean 的應用程式應在取得 Lazy Bean 的參考之前檢查佇列引數。
tickmark
tickmark

missingQueuesFatal
(missing-queues-fatal)

當設定為 true (預設值) 時,如果在 Broker 上沒有任何組態的佇列可用,則會視為嚴重錯誤。這會導致應用程式內容在啟動期間初始化失敗。此外,當佇列在容器執行時被刪除時,依預設,消費者會重試三次以連線到佇列 (間隔五秒),如果這些嘗試失敗,則會停止容器。

這在先前的版本中無法組態。

當設定為 false 時,在進行三次重試後,容器會進入恢復模式,如同其他問題一樣,例如 Broker 關閉。容器會根據 recoveryInterval 屬性嘗試恢復。在每次恢復嘗試期間,每個消費者都會再次嘗試四次以被動宣告佇列,間隔五秒。此程序會無限期地繼續。

您也可以使用屬性 Bean 為所有容器全域設定屬性,如下所示

<util:properties
        id="spring.amqp.global.properties">
    <prop key="mlc.missing.queues.fatal">
        false
    </prop>
</util:properties>

此全域屬性不適用於已設定明確 missingQueuesFatal 屬性的任何容器。

預設重試屬性 (三次重試,間隔五秒) 可以透過設定以下屬性來覆寫。

在啟動標記為 @Lazy 的 Bean 中 @RabbitListener 的容器時,會停用遺失佇列偵測。這是為了避免可能導致延遲此類容器啟動長達 60 秒的潛在死鎖。使用 Lazy 監聽器 Bean 的應用程式應在取得 Lazy Bean 的參考之前檢查佇列。
tickmark
tickmark

monitorInterval
(monitor-interval)

使用 DMLC 時,會排程任務以在此間隔執行,以監視消費者的狀態並恢復任何失敗的消費者。

tickmark

noLocal
(N/A)

設定為 true 以停用從伺服器到在相同通道連線上發布訊息的消費者的傳遞。

tickmark
tickmark

phase
(phase)

autoStartuptrue 時,此容器應在其中啟動和停止的生命週期階段。值越低,此容器啟動得越早,停止得越晚。預設值為 Integer.MAX_VALUE,表示容器會盡可能晚啟動並盡可能早停止。

tickmark
tickmark

possibleAuthenticationFailureFatal
(possible-authentication-failure-fatal)

當設定為 true (SMLC 的預設值) 時,如果在連線期間擲回 PossibleAuthenticationFailureException,則會視為嚴重錯誤。這會導致應用程式內容在啟動期間初始化失敗 (如果容器組態為自動啟動)。

2.0 版起。

DirectMessageListenerContainer

當設定為 false (預設值) 時,每個消費者都會根據 monitorInterval 嘗試重新連線。

SimpleMessageListenerContainer

當設定為 false 時,在進行 3 次重試後,容器將進入恢復模式,如同其他問題一樣,例如 Broker 關閉。容器將根據 recoveryInterval 屬性嘗試恢復。在每次恢復嘗試期間,每個消費者都會再次嘗試 4 次以啟動。此程序將無限期地繼續。

您也可以使用屬性 Bean 為所有容器全域設定屬性,如下所示

<util:properties
    id="spring.amqp.global.properties">
  <prop
    key="mlc.possible.authentication.failure.fatal">
     false
  </prop>
</util:properties>

此全域屬性將不適用於已設定明確 missingQueuesFatal 屬性的任何容器。

預設重試屬性 (3 次重試,間隔 5 秒) 可以使用此屬性之後的屬性覆寫。

tickmark
tickmark

prefetchCount
(prefetch)

每個消費者可以未確認的訊息數量上限。此值越高,訊息傳遞速度越快,但非循序處理的風險越高。如果 acknowledgeModeNONE,則會忽略。如有必要,這會增加以符合 batchSizemessagePerAck。自 2.0 版起預設值為 250。您可以將其設定為 1 以還原為先前的行為。

在某些情況下,預取值應該較低 — 例如,對於大型訊息,尤其是如果處理速度很慢 (訊息可能會在用戶端程序中累積大量記憶體),並且如果需要嚴格的訊息排序 (在這種情況下,預取值應設定回 1)。此外,對於低容量訊息傳遞和多個消費者 (包括單一監聽器容器實例中的並行性),您可能希望減少預取以獲得更均勻的跨消費者訊息分配。

另請參閱 globalQos

tickmark
tickmark

rabbitAdmin
(admin)

當監聽器容器監聽至少一個自動刪除佇列,並且在啟動期間發現它遺失時,容器會使用 RabbitAdmin 來宣告佇列以及任何相關的繫結和交換器。如果這些元素組態為使用條件式宣告 (請參閱 條件式宣告),則容器必須使用組態為宣告這些元素的管理員。在此處指定該管理員。僅當將自動刪除佇列與條件式宣告搭配使用時才需要。如果您不希望在容器啟動之前宣告自動刪除佇列,請在管理員上將 auto-startup 設定為 false。預設為宣告所有非條件式元素的 RabbitAdmin

tickmark
tickmark

receiveTimeout
(receive-timeout)

等待每個訊息的最長時間。如果 acknowledgeMode=NONE,則效果非常小 — 容器會循環並要求另一個訊息。對於具有 batchSize > 1 的交易式 Channel,效果最大,因為它可能會導致已取用的訊息在逾時時間到期之前未被確認。當 consumerBatchEnabled 為 true 時,如果在此逾時時間發生在批次完成之前,則會傳遞部分批次。

tickmark

batchReceiveTimeout
(batch-receive-timeout)

收集批次訊息的逾時時間 (以毫秒為單位)。它限制了等待填滿 batchSize 的時間。當 batchSize > 1 且收集批次訊息的時間大於 batchReceiveTime 時,將會傳遞批次。預設值為 0 (無逾時)。

tickmark

recoveryBackOff
(recovery-back-off)

指定 BackOff 用於在消費者因非嚴重原因而啟動失敗時,嘗試啟動消費者之間的間隔。預設值為 FixedBackOff,無限次重試,每次間隔五秒。與 recoveryInterval 互斥。

tickmark
tickmark

recoveryInterval
(recovery-interval)

決定在消費者因非致命性原因啟動失敗時,嘗試重新啟動消費者之間的時間間隔(毫秒)。預設值:5000。與 recoveryBackOff 互斥。

tickmark
tickmark

retryDeclarationInterval
(missing-queue- retry-interval)

如果在消費者初始化期間,只有部分設定的佇列可用,消費者會開始從這些可用的佇列消費訊息。消費者會嘗試使用此間隔被動宣告遺失的佇列。當此間隔時間到期時,會再次使用 'declarationRetries' 和 'failedDeclarationRetryInterval'。如果仍然有遺失的佇列,消費者會再次等待此間隔時間,然後再次嘗試。此過程會無限期地持續下去,直到所有佇列都可用為止。預設值:60000(一分鐘)。

tickmark

shutdownTimeout
(N/A)

當容器關閉時(例如,如果其封閉的 ApplicationContext 被關閉),它會等待處理中的訊息完成處理,直到達到此限制。預設值為五秒。

tickmark
tickmark

startConsumerMinInterval
(min-start-interval)

在根據需求啟動每個新消費者之前,必須經過的時間間隔(毫秒)。請參閱 Listener Concurrency。預設值:10000(10 秒)。

tickmark

statefulRetryFatal
WithNullMessageId (N/A)

當使用具狀態的重試建議時,如果收到缺少 messageId 屬性的訊息,預設情況下會將其視為消費者的致命錯誤(消費者會停止運作)。將此設定為 false 以捨棄(或路由到死信佇列)此類訊息。

tickmark
tickmark

stopConsumerMinInterval
(min-stop-interval)

當偵測到閒置消費者時,自上次消費者停止後,必須經過的時間間隔(毫秒),才能停止消費者。請參閱 Listener Concurrency。預設值:60000(一分鐘)。

tickmark

streamConverter
(N/A)

用於將原生 Stream 訊息轉換為 Spring AMQP 訊息的 StreamMessageConverter

tickmark

taskExecutor
(task-executor)

Spring TaskExecutor(或標準 JDK 1.5+ Executor)的參考,用於執行 listener invoker。預設值為 SimpleAsyncTaskExecutor,使用內部管理的執行緒。

tickmark
tickmark

taskScheduler
(task-scheduler)

對於 DMLC,用於在 'monitorInterval' 執行監控任務的 scheduler。

tickmark

transactionManager
(transaction-manager)

用於 listener 操作的外部交易管理器。也與 channelTransacted 互補 —— 如果 Channel 是交易性的,其交易會與外部交易同步。

tickmark
tickmark