連線與資源管理
雖然我們在前一節描述的 AMQP 模型是通用的,並且適用於所有實作,但當我們深入探討資源管理時,細節會因 Broker 實作而異。因此,在本節中,我們將重點放在僅存在於我們的 “spring-rabbit” 模組中的程式碼,因為在這一點上,RabbitMQ 是唯一支援的實作。
用於管理與 RabbitMQ Broker 連線的核心元件是 ConnectionFactory
介面。ConnectionFactory
實作的責任是提供 org.springframework.amqp.rabbit.connection.Connection
的實例,它是 com.rabbitmq.client.Connection
的封裝。
選擇連線工厂
有三種連線工厂可供選擇
-
PooledChannelConnectionFactory
-
ThreadChannelConnectionFactory
-
CachingConnectionFactory
前兩個是在 2.3 版本中新增的。
對於大多數使用案例,應使用 CachingConnectionFactory
。如果您想要確保嚴格的訊息排序,而無需使用作用域操作,則可以使用 ThreadChannelConnectionFactory
。PooledChannelConnectionFactory
類似於 CachingConnectionFactory
,因為它使用單一連線和通道池。它的實作更簡單,但它不支援相關的發布者確認。
所有三種工厂都支援簡單的發布者確認。
當設定 RabbitTemplate
以使用單獨連線時,從 2.3.2 版本開始,您現在可以將發布連線工厂設定為不同的類型。預設情況下,發布工厂是相同的類型,並且在主工厂上設定的任何屬性也會傳播到發布工厂。
從 3.1 版本開始,AbstractConnectionFactory
包含 connectionCreatingBackOff
屬性,它在連線模組中支援退避策略。目前,在 createChannel()
的行為中支援處理在達到 channelMax
限制時發生的例外,實作基於嘗試和間隔的退避策略。
PooledChannelConnectionFactory
此工厂管理單一連線和兩個通道池,基於 Apache Pool2。一個池用於交易通道,另一個池用於非交易通道。這些池是具有預設設定的 GenericObjectPool
s;提供了一個回呼來設定池;有關更多資訊,請參閱 Apache 文件。
要使用此工厂,Apache commons-pool2
jar 必須在類別路徑上。
@Bean
PooledChannelConnectionFactory pcf() throws Exception {
ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
rabbitConnectionFactory.setHost("localhost");
PooledChannelConnectionFactory pcf = new PooledChannelConnectionFactory(rabbitConnectionFactory);
pcf.setPoolConfigurer((pool, tx) -> {
if (tx) {
// configure the transactional pool
}
else {
// configure the non-transactional pool
}
});
return pcf;
}
ThreadChannelConnectionFactory
此工厂管理單一連線和兩個 ThreadLocal
s,一個用於交易通道,另一個用於非交易通道。此工厂確保同一執行緒上的所有操作都使用相同的通道(只要它保持開啟)。這有助於實現嚴格的訊息排序,而無需作用域操作。為了避免記憶體洩漏,如果您的應用程式使用許多短生命週期的執行緒,您必須呼叫工厂的 closeThreadChannel()
以釋放通道資源。從 2.3.7 版本開始,執行緒可以將其通道傳輸到另一個執行緒。有關更多資訊,請參閱多執行緒環境中的嚴格訊息排序。
CachingConnectionFactory
提供的第三個實作是 CachingConnectionFactory
,預設情況下,它建立一個可以由應用程式共用的單一連線 Proxy。連線的共用是可能的,因為使用 AMQP 進行訊息傳遞的 “工作單元” 實際上是一個 “通道”(在某些方面,這類似於 JMS 中連線和會話之間的關係)。連線實例提供了一個 createChannel
方法。CachingConnectionFactory
實作支援快取這些通道,並且它維護基於通道是否為交易性的通道的單獨快取。建立 CachingConnectionFactory
的實例時,您可以通過建構子提供 'hostname'。您還應該提供 'username' 和 'password' 屬性。要設定通道快取的大小(預設值為 25),您可以呼叫 setChannelCacheSize()
方法。
從 1.3 版本開始,您可以設定 CachingConnectionFactory
來快取連線以及僅快取通道。在這種情況下,每次呼叫 createConnection()
都會建立一個新的連線(或從快取中檢索一個閒置的連線)。關閉連線會將其返回到快取(如果尚未達到快取大小)。在此類連線上建立的通道也會被快取。在某些環境中,例如從 HA 叢集中消費,與負載平衡器結合以連線到不同的叢集成員等,使用單獨的連線可能很有用。要快取連線,請將 cacheMode
設定為 CacheMode.CONNECTION
。
這不會限制連線數量。相反,它指定允許多少個閒置的開啟連線。 |
從 1.5.5 版本開始,提供了一個名為 connectionLimit
的新屬性。設定此屬性時,它會限制允許的連線總數。設定後,如果達到限制,則使用 channelCheckoutTimeLimit
來等待連線變為閒置。如果超過時間,則會拋出 AmqpTimeoutException
。
當快取模式為 此外,在撰寫本文時, |
重要的是要理解,快取大小(預設情況下)不是限制,而僅是可以快取的通道數。例如,快取大小為 10,實際上可以使用任意數量的通道。如果正在使用超過 10 個通道並且它們都返回到快取,則 10 個進入快取。其餘的則會被物理關閉。
從 1.6 版本開始,預設通道快取大小已從 1 增加到 25。在高容量、多執行緒環境中,小快取意味著通道以高速率建立和關閉。增加預設快取大小可以避免這種開銷。您應該通過 RabbitMQ 管理 UI 監控正在使用的通道,如果您看到許多通道正在建立和關閉,請考慮進一步增加快取大小。快取僅按需成長(以適應應用程式的並行性要求),因此此變更不會影響現有的低容量應用程式。
從 1.4.2 版本開始,CachingConnectionFactory
具有一個名為 channelCheckoutTimeout
的屬性。當此屬性大於零時,channelCacheSize
將成為可以在連線上建立的通道數量的限制。如果達到限制,則呼叫執行緒會阻塞,直到通道可用或達到此逾時時間,在這種情況下,將拋出 AmqpTimeoutException
。
框架內使用的通道(例如,RabbitTemplate )會可靠地返回到快取。如果您在框架外部建立通道(例如,通過直接存取連線並調用 createChannel() ),您必須可靠地返回它們(通過關閉),可能在 finally 區塊中,以避免耗盡通道。 |
以下範例顯示如何建立新的 connection
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("somehost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.createConnection();
使用 XML 時,設定可能如下例所示
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
</bean>
還有一個 SingleConnectionFactory 實作,僅在框架的單元測試程式碼中可用。它比 CachingConnectionFactory 更簡單,因為它不快取通道,但由於其缺乏效能和彈性,因此不適用於簡單測試之外的實際使用。如果您由於某些原因需要實作自己的 ConnectionFactory ,則 AbstractConnectionFactory 基底類別可能會提供一個很好的起點。 |
可以使用 rabbit 命名空間快速方便地建立 ConnectionFactory
,如下所示
<rabbit:connection-factory id="connectionFactory"/>
在大多數情況下,此方法是更可取的,因為框架可以為您選擇最佳預設值。建立的實例是 CachingConnectionFactory
。請記住,通道的預設快取大小為 25。如果您希望快取更多通道,請通過設定 'channelCacheSize' 屬性來設定更大的值。在 XML 中,它看起來像如下所示
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
<property name="channelCacheSize" value="50"/>
</bean>
此外,使用命名空間,您可以新增 'channel-cache-size' 屬性,如下所示
<rabbit:connection-factory
id="connectionFactory" channel-cache-size="50"/>
預設快取模式為 CHANNEL
,但您可以將其設定為改為快取連線。在以下範例中,我們使用 connection-cache-size
<rabbit:connection-factory
id="connectionFactory" cache-mode="CONNECTION" connection-cache-size="25"/>
您可以使用命名空間提供主機和埠屬性,如下所示
<rabbit:connection-factory
id="connectionFactory" host="somehost" port="5672"/>
或者,如果在叢集環境中執行,您可以使用 addresses 屬性,如下所示
<rabbit:connection-factory
id="connectionFactory" addresses="host1:5672,host2:5672" address-shuffle-mode="RANDOM"/>
有關 address-shuffle-mode
的資訊,請參閱連線到叢集。
以下範例使用自訂執行緒工厂,該工厂使用 rabbitmq-
作為執行緒名稱的前綴
<rabbit:connection-factory id="multiHost" virtual-host="/bar" addresses="host1:1234,host2,host3:4567"
thread-factory="tf"
channel-cache-size="10" username="user" password="password" />
<bean id="tf" class="org.springframework.scheduling.concurrent.CustomizableThreadFactory">
<constructor-arg value="rabbitmq-" />
</bean>
命名連線
從 1.7 版本開始,提供了一個 ConnectionNameStrategy
,用於注入到 AbstractionConnectionFactory
中。產生的名稱用於特定於應用程式的目標 RabbitMQ 連線識別。如果 RabbitMQ 伺服器支援,則連線名稱會顯示在管理 UI 中。此值不必是唯一的,並且不能用作連線識別符號 - 例如,在 HTTP API 請求中。此值應該是人類可讀的,並且是 ClientProperties
中 connection_name
金鑰的一部分。您可以使用簡單的 Lambda,如下所示
connectionFactory.setConnectionNameStrategy(connectionFactory -> "MY_CONNECTION");
ConnectionFactory
參數可用於通過某些邏輯來區分目標連線名稱。預設情況下,AbstractConnectionFactory
的 beanName
、表示物件的十六進位字串和內部計數器用於產生 connection_name
。<rabbit:connection-factory>
命名空間元件也提供了 connection-name-strategy
屬性。
SimplePropertyValueConnectionNameStrategy
的實作將連線名稱設定為應用程式屬性。您可以將其宣告為 @Bean
並將其注入到連線工厂中,如下例所示
@Bean
public SimplePropertyValueConnectionNameStrategy cns() {
return new SimplePropertyValueConnectionNameStrategy("spring.application.name");
}
@Bean
public ConnectionFactory rabbitConnectionFactory(ConnectionNameStrategy cns) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
...
connectionFactory.setConnectionNameStrategy(cns);
return connectionFactory;
}
屬性必須存在於應用程式上下文的 Environment
中。
當使用 Spring Boot 及其自動設定的連線工厂時,您只需要宣告 ConnectionNameStrategy @Bean 。Boot 會自動偵測 Bean 並將其連接到工厂。 |
封鎖的連線和資源限制
連線可能會因來自 Broker 的互動而被封鎖,這對應於記憶體警報。從 2.0 版本開始,可以為 org.springframework.amqp.rabbit.connection.Connection
提供 com.rabbitmq.client.BlockedListener
實例,以接收連線封鎖和解除封鎖事件的通知。此外,AbstractConnectionFactory
分別通過其內部 BlockedListener
實作發出 ConnectionBlockedEvent
和 ConnectionUnblockedEvent
。這些讓您可以提供應用程式邏輯,以適當地回應 Broker 上的問題,並(例如)採取一些糾正措施。
當應用程式設定了單一 CachingConnectionFactory 時,就像 Spring Boot 自動設定的預設情況一樣,當連線被 Broker 封鎖時,應用程式會停止工作。當它被 Broker 封鎖時,它的任何用戶端都會停止工作。如果我們在同一個應用程式中同時擁有生產者和消費者,當生產者封鎖連線(因為 Broker 上沒有更多資源)並且消費者無法釋放它們(因為連線被封鎖)時,我們可能會最終陷入死鎖。為了緩解此問題,我們建議再擁有一個具有相同選項的單獨 CachingConnectionFactory 實例 - 一個用於生產者,一個用於消費者。對於在消費者執行緒上執行的交易生產者來說,單獨的 CachingConnectionFactory 是不可能的,因為它們應該重複使用與消費者交易關聯的 Channel 。 |
從 2.0.2 版本開始,RabbitTemplate
具有一個設定選項,可以自動使用第二個連線工厂,除非正在使用交易。有關更多資訊,請參閱使用單獨的連線。發布者連線的 ConnectionNameStrategy
與主要策略相同,並在方法呼叫結果後附加 .publisher
。
從 1.7.7 版本開始,提供了一個 AmqpResourceNotAvailableException
,當 SimpleConnection.createChannel()
無法建立 Channel
時(例如,因為達到 channelMax
限制並且快取中沒有可用的通道),會拋出此例外。您可以在 RetryPolicy
中使用此例外,以在一些退避後恢復操作。
設定底層用戶端連線工厂
CachingConnectionFactory
使用 Rabbit 用戶端 ConnectionFactory
的實例。當在 CachingConnectionFactory
上設定等效屬性時,會傳遞許多設定屬性(例如,host
、port
、userName
、password
、requestedHeartBeat
和 connectionTimeout
)。要設定其他屬性(例如,clientProperties
),您可以定義 Rabbit 工厂的實例,並通過使用 CachingConnectionFactory
的適當建構子提供對它的參考。當使用命名空間(如先前所述)時,您需要在 connection-factory
屬性中提供對已設定工厂的參考。為了方便起見,提供了一個工厂 Bean 來協助在 Spring 應用程式上下文中設定連線工厂,如下一節中所討論的。
<rabbit:connection-factory
id="connectionFactory" connection-factory="rabbitConnectionFactory"/>
4.0.x 用戶端預設啟用自動恢復。雖然與此功能相容,但 Spring AMQP 有自己的恢復機制,並且通常不需要用戶端恢復功能。我們建議停用 amqp-client 自動恢復,以避免在 Broker 可用但連線尚未恢復時獲得 AutoRecoverConnectionNotCurrentlyOpenException 實例。例如,當在 RabbitTemplate 中設定 RetryTemplate 時,即使故障轉移到叢集中的另一個 Broker,您也可能會注意到此例外。由於自動恢復連線在計時器上恢復,因此通過使用 Spring AMQP 的恢復機制可以更快地恢復連線。從 1.7.1 版本開始,Spring AMQP 停用 amqp-client 自動恢復,除非您明確建立自己的 RabbitMQ 連線工厂並將其提供給 CachingConnectionFactory 。由 RabbitConnectionFactoryBean 建立的 RabbitMQ ConnectionFactory 實例也預設停用該選項。 |
RabbitConnectionFactoryBean
和設定 SSL
從 1.4 版本開始,提供了一個方便的 RabbitConnectionFactoryBean
,以通過依賴注入方便地在底層用戶端連線工厂上設定 SSL 屬性。其他 Setter 委託給底層工厂。以前,您必須以程式方式設定 SSL 選項。以下範例顯示如何設定 RabbitConnectionFactoryBean
@Bean
RabbitConnectionFactoryBean rabbitConnectionFactory() {
RabbitConnectionFactoryBean factoryBean = new RabbitConnectionFactoryBean();
factoryBean.setUseSSL(true);
factoryBean.setSslPropertiesLocation(new ClassPathResource("secrets/rabbitSSL.properties"));
return factoryBean;
}
@Bean
CachingConnectionFactory connectionFactory(ConnectionFactory rabbitConnectionFactory) {
CachingConnectionFactory ccf = new CachingConnectionFactory(rabbitConnectionFactory);
ccf.setHost("...");
// ...
return ccf;
}
spring.rabbitmq.ssl.enabled:true
spring.rabbitmq.ssl.keyStore=...
spring.rabbitmq.ssl.keyStoreType=jks
spring.rabbitmq.ssl.keyStorePassword=...
spring.rabbitmq.ssl.trustStore=...
spring.rabbitmq.ssl.trustStoreType=jks
spring.rabbitmq.ssl.trustStorePassword=...
spring.rabbitmq.host=...
...
<rabbit:connection-factory id="rabbitConnectionFactory"
connection-factory="clientConnectionFactory"
host="${host}"
port="${port}"
virtual-host="${vhost}"
username="${username}" password="${password}" />
<bean id="clientConnectionFactory"
class="org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean">
<property name="useSSL" value="true" />
<property name="sslPropertiesLocation" value="classpath:secrets/rabbitSSL.properties"/>
</bean>
有關設定 SSL 的資訊,請參閱RabbitMQ 文件。省略 keyStore
和 trustStore
設定以通過 SSL 連線,而無需憑證驗證。下一個範例顯示如何提供金鑰和信任儲存區設定。
sslPropertiesLocation
屬性是一個 Spring Resource
,指向包含以下金鑰的屬性檔案
keyStore=file:/secret/keycert.p12
trustStore=file:/secret/trustStore
keyStore.passPhrase=secret
trustStore.passPhrase=secret
keyStore
和 truststore
是 Spring Resources
,指向儲存區。通常,此屬性檔案由作業系統保護,應用程式具有讀取權限。
從 Spring AMQP 1.5 版本開始,您可以直接在工厂 Bean 上設定這些屬性。如果同時提供了離散屬性和 sslPropertiesLocation
,則後者中的屬性會覆寫離散值。
從 2.0 版本開始,預設情況下會驗證伺服器憑證,因為它更安全。如果您希望由於某些原因跳過此驗證,請將工厂 Bean 的 skipServerCertificateValidation 屬性設定為 true 。從 2.1 版本開始,RabbitConnectionFactoryBean 現在預設呼叫 enableHostnameVerification() 。要恢復到先前的行為,請將 enableHostnameVerification 屬性設定為 false 。 |
從 2.2.5 版本開始,工厂 Bean 將始終預設使用 TLS v1.2;以前,在某些情況下它使用 v1.1,而在其他情況下使用 v1.2(取決於其他屬性)。如果您由於某些原因需要使用 v1.1,請設定 sslAlgorithm 屬性:setSslAlgorithm("TLSv1.1") 。 |
連線到叢集
要連線到叢集,請在 CachingConnectionFactory
上設定 addresses
屬性
@Bean
public CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory();
ccf.setAddresses("host1:5672,host2:5672,host3:5672");
return ccf;
}
從 3.0 版本開始,每當建立新連線時,底層連線工厂都會嘗試通過選擇隨機位址來連線到主機。要恢復為先前從第一個到最後一個嘗試連線的行為,請將 addressShuffleMode
屬性設定為 AddressShuffleMode.NONE
。
從 2.3 版本開始,新增了 INORDER
混洗模式,這意味著在建立連線後,第一個位址會移動到末尾。如果您希望與RabbitMQ Sharding Plugin 搭配使用 CacheMode.CONNECTION
和適當的並行性,以從所有節點上的所有分片中消費,您可能希望使用此模式。
@Bean
public CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory();
ccf.setAddresses("host1:5672,host2:5672,host3:5672");
ccf.setAddressShuffleMode(AddressShuffleMode.INORDER);
return ccf;
}
路由連線工厂
從 1.3 版本開始,引入了 AbstractRoutingConnectionFactory
。此工厂提供了一種機制,用於設定多個 ConnectionFactories
的對應,並在運行時通過某些 lookupKey
確定目標 ConnectionFactory
。通常,實作會檢查執行緒綁定的上下文。為了方便起見,Spring AMQP 提供了 SimpleRoutingConnectionFactory
,它從 SimpleResourceHolder
取得當前執行緒綁定的 lookupKey
。以下範例顯示如何在 XML 和 Java 中設定 SimpleRoutingConnectionFactory
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory">
<property name="targetConnectionFactories">
<map>
<entry key="#{connectionFactory1.virtualHost}" ref="connectionFactory1"/>
<entry key="#{connectionFactory2.virtualHost}" ref="connectionFactory2"/>
</map>
</property>
</bean>
<rabbit:template id="template" connection-factory="connectionFactory" />
public class MyService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void service(String vHost, String payload) {
SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), vHost);
rabbitTemplate.convertAndSend(payload);
SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());
}
}
使用後,務必解除綁定資源。有關更多資訊,請參閱 AbstractRoutingConnectionFactory
的JavaDoc。
從 1.4 版本開始,RabbitTemplate
支援 SpEL sendConnectionFactorySelectorExpression
和 receiveConnectionFactorySelectorExpression
屬性,這些屬性在每個 AMQP 協定互動操作(send
、sendAndReceive
、receive
或 receiveAndReply
)上評估,解析為提供的 AbstractRoutingConnectionFactory
的 lookupKey
值。您可以在表達式中使用 Bean 參考,例如 @vHostResolver.getVHost(#root)
。對於 send
操作,要傳送的訊息是根評估物件。對於 receive
操作,queueName
是根評估物件。
路由演算法如下:如果選擇器表達式為 null
或評估為 null
,或者提供的 ConnectionFactory
不是 AbstractRoutingConnectionFactory
的實例,則一切都像以前一樣工作,依賴於提供的 ConnectionFactory
實作。如果評估結果不為 null
,但該 lookupKey
沒有目標 ConnectionFactory
,並且 AbstractRoutingConnectionFactory
設定為 lenientFallback = true
,則也會發生同樣的情況。在 AbstractRoutingConnectionFactory
的情況下,它會根據 determineCurrentLookupKey()
回退到其 routing
實作。但是,如果 lenientFallback = false
,則會拋出 IllegalStateException
。
命名空間支援還在 <rabbit:template>
元件上提供了 send-connection-factory-selector-expression
和 receive-connection-factory-selector-expression
屬性。
此外,從 1.4 版本開始,您可以在監聽器容器中設定路由連線工厂。在這種情況下,佇列名稱列表用作查找金鑰。例如,如果您使用 setQueueNames("thing1", "thing2")
設定容器,則查找金鑰為 [thing1,thing]"
(請注意金鑰中沒有空格)。
從 1.6.9 版本開始,您可以通過在監聽器容器上使用 setLookupKeyQualifier
來為查找金鑰新增限定詞。這樣做可以啟用,例如,監聽具有相同名稱但在不同虛擬主機中的佇列(您將為每個虛擬主機擁有一個連線工厂)。
例如,使用查找金鑰限定詞 thing1
和監聽佇列 thing2
的容器,您可以將目標連線工厂註冊為 thing1[thing2]
。
目標(和預設,如果提供)連線工厂對於發布者確認和返回必須具有相同的設定。請參閱發布者確認和返回。 |
從 2.4.4 版本開始,可以停用此驗證。如果您遇到確認和返回之間的值需要不相等的情況,可以使用 AbstractRoutingConnectionFactory#setConsistentConfirmsReturns
來關閉驗證。請注意,新增到 AbstractRoutingConnectionFactory
的第一個連線工厂將決定 confirms
和 returns
的一般值。
如果您遇到某些訊息您想要檢查確認/返回而其他訊息您不想檢查的情況,這可能很有用。例如
@Bean
public RabbitTemplate rabbitTemplate() {
final com.rabbitmq.client.ConnectionFactory cf = new com.rabbitmq.client.ConnectionFactory();
cf.setHost("localhost");
cf.setPort(5672);
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(cf);
cachingConnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
PooledChannelConnectionFactory pooledChannelConnectionFactory = new PooledChannelConnectionFactory(cf);
final Map<Object, ConnectionFactory> connectionFactoryMap = new HashMap<>(2);
connectionFactoryMap.put("true", cachingConnectionFactory);
connectionFactoryMap.put("false", pooledChannelConnectionFactory);
final AbstractRoutingConnectionFactory routingConnectionFactory = new SimpleRoutingConnectionFactory();
routingConnectionFactory.setConsistentConfirmsReturns(false);
routingConnectionFactory.setDefaultTargetConnectionFactory(pooledChannelConnectionFactory);
routingConnectionFactory.setTargetConnectionFactories(connectionFactoryMap);
final RabbitTemplate rabbitTemplate = new RabbitTemplate(routingConnectionFactory);
final Expression sendExpression = new SpelExpressionParser().parseExpression(
"messageProperties.headers['x-use-publisher-confirms'] ?: false");
rabbitTemplate.setSendConnectionFactorySelectorExpression(sendExpression);
}
這樣,帶有標頭 x-use-publisher-confirms: true
的訊息將通過快取連線傳送,您可以確保訊息傳遞。有關確保訊息傳遞的更多資訊,請參閱發布者確認和返回。
佇列親和性與 LocalizedQueueConnectionFactory
當在叢集中使用 HA 佇列時,為了獲得最佳效能,您可能希望連線到領導佇列所在的物理 Broker。CachingConnectionFactory
可以設定多個 Broker 位址。這是為了故障轉移,並且用戶端嘗試按照設定的 AddressShuffleMode
順序連線。LocalizedQueueConnectionFactory
使用管理外掛程式提供的 REST API 來確定哪個節點是佇列的領導者。然後,它建立(或從快取中檢索)一個僅連線到該節點的 CachingConnectionFactory
。如果連線失敗,則確定新的領導節點,並且消費者連線到它。LocalizedQueueConnectionFactory
設定了預設連線工厂,以防無法確定佇列的物理位置,在這種情況下,它像往常一樣連線到叢集。
LocalizedQueueConnectionFactory
是一個 RoutingConnectionFactory
,並且 SimpleMessageListenerContainer
使用佇列名稱作為查找金鑰,如上面的路由連線工厂中所討論的。
由於這個原因(使用佇列名稱進行查找),僅當容器設定為監聽單一佇列時,才能使用 LocalizedQueueConnectionFactory 。 |
RabbitMQ 管理外掛程式必須在每個節點上啟用。 |
此連線工厂旨在用於長生命週期的連線,例如 SimpleMessageListenerContainer 使用的連線。它不適用於短連線使用,例如 RabbitTemplate ,因為在建立連線之前調用 REST API 會產生開銷。此外,對於發布操作,佇列是未知的,並且訊息無論如何都會發布到所有叢集成員,因此查找節點的邏輯幾乎沒有價值。 |
以下範例設定顯示如何設定工厂
@Autowired
private ConfigurationProperties props;
@Bean
public CachingConnectionFactory defaultConnectionFactory() {
CachingConnectionFactory cf = new CachingConnectionFactory();
cf.setAddresses(this.props.getAddresses());
cf.setUsername(this.props.getUsername());
cf.setPassword(this.props.getPassword());
cf.setVirtualHost(this.props.getVirtualHost());
return cf;
}
@Bean
public LocalizedQueueConnectionFactory queueAffinityCF(
@Qualifier("defaultConnectionFactory") ConnectionFactory defaultCF) {
return new LocalizedQueueConnectionFactory(defaultCF,
StringUtils.commaDelimitedListToStringArray(this.props.getAddresses()),
StringUtils.commaDelimitedListToStringArray(this.props.getAdminUris()),
StringUtils.commaDelimitedListToStringArray(this.props.getNodes()),
this.props.getVirtualHost(), this.props.getUsername(), this.props.getPassword(),
false, null);
}
請注意,前三個參數是 addresses
、adminUris
和 nodes
的陣列。這些是位置性的,因為當容器嘗試連線到佇列時,它會使用管理 API 來確定哪個節點是佇列的領導者,並連線到與該節點在相同陣列位置的位址。
從 3.0 版本開始,不再使用 RabbitMQ http-client 來存取 Rest API。相反,預設情況下,如果類別路徑上有 spring-webflux ,則使用 Spring Webflux 中的 WebClient ;否則,使用 RestTemplate 。 |
要將 WebFlux
新增到類別路徑
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
compile 'org.springframework.amqp:spring-rabbit'
您也可以通過實作 LocalizedQueueConnectionFactory.NodeLocator
並覆寫其 createClient, ``restCall
以及可選的 close
方法來使用其他 REST 技術。
lqcf.setNodeLocator(new NodeLocator<MyClient>() {
@Override
public MyClient createClient(String userName, String password) {
...
}
@Override
public HashMap<String, Object> restCall(MyClient client, URI uri) {
...
});
});
框架提供了 WebFluxNodeLocator
和 RestTemplateNodeLocator
,預設值如上所述。
發布者確認和返回
透過將 CachingConnectionFactory
屬性 publisherConfirmType
設定為 ConfirmType.CORRELATED
,以及將 publisherReturns
屬性設定為 'true',即可支援已確認(帶關聯)和退回的消息。
當設定這些選項時,工廠建立的 Channel
實例會包裝在 PublisherCallbackChannel
中,用於協助回呼。當取得此類通道時,用戶端可以向 Channel
註冊 PublisherCallbackChannel.Listener
。PublisherCallbackChannel
實作包含將確認或退回路由至適當監聽器的邏輯。這些功能將在以下章節中進一步說明。
另請參閱 關聯發布者確認和退回 以及 範圍操作 中的 simplePublisherConfirms
。
如需更多背景資訊,請參閱 RabbitMQ 團隊的部落格文章,標題為 介紹發布者確認。 |
連線和通道監聽器
連線工廠支援註冊 ConnectionListener
和 ChannelListener
實作。這可讓您接收連線和通道相關事件的通知。(ConnectionListener
由 RabbitAdmin
使用,在建立連線時執行宣告 - 有關更多資訊,請參閱 交換器、佇列和綁定的自動宣告)。以下清單顯示了 ConnectionListener
介面定義
@FunctionalInterface
public interface ConnectionListener {
void onCreate(Connection connection);
default void onClose(Connection connection) {
}
default void onShutDown(ShutdownSignalException signal) {
}
}
從 2.0 版開始,org.springframework.amqp.rabbit.connection.Connection
物件可以提供 com.rabbitmq.client.BlockedListener
實例,以便在連線封鎖和解除封鎖事件時收到通知。以下範例顯示了 ChannelListener 介面定義
@FunctionalInterface
public interface ChannelListener {
void onCreate(Channel channel, boolean transactional);
default void onShutDown(ShutdownSignalException signal) {
}
}
有關您可能想要註冊 ChannelListener
的一種情境,請參閱 發布是異步的 — 如何偵測成功和失敗。
記錄通道關閉事件
1.5 版引入了一種機制,讓使用者可以控制記錄層級。
AbstractConnectionFactory
使用預設策略來記錄通道關閉,如下所示
-
正常的通道關閉 (200 OK) 不會記錄。
-
如果通道因被動佇列宣告失敗而關閉,則會在 DEBUG 層級記錄。
-
如果通道因
basic.consume
因獨佔消費者條件而被拒絕而關閉,則會在 DEBUG 層級記錄(自 3.1 起,先前為 INFO)。 -
所有其他情況都在 ERROR 層級記錄。
若要修改此行為,您可以將自訂 ConditionalExceptionLogger
注入到 CachingConnectionFactory
的 closeExceptionLogger
屬性中。
此外,AbstractConnectionFactory.DefaultChannelCloseLogger
現在是公開的,允許對其進行子類別化。
另請參閱 消費者事件。
執行階段快取屬性
從 1.6 版開始,CachingConnectionFactory
現在透過 getCacheProperties()
方法提供快取統計資訊。這些統計資訊可用於調整快取,以在生產環境中進行最佳化。例如,高水位標記可用於判斷是否應增加快取大小。如果它等於快取大小,您可能需要考慮進一步增加。下表描述了 CacheMode.CHANNEL
屬性
屬性 | 含義 |
---|---|
connectionName |
由 |
channelCacheSize |
目前設定的最大允許閒置通道數。 |
localPort |
連線的本機埠(如果可用)。這可用於與 RabbitMQ 管理 UI 上的連線和通道建立關聯。 |
idleChannelsTx |
目前閒置(已快取)的交易通道數。 |
idleChannelsNotTx |
目前閒置(已快取)的非交易通道數。 |
idleChannelsTxHighWater |
同時閒置(已快取)的交易通道最大數量。 |
idleChannelsNotTxHighWater |
同時閒置(已快取)的非交易通道最大數量。 |
下表描述了 CacheMode.CONNECTION
屬性
屬性 | 含義 |
---|---|
connectionName:<localPort> |
由 |
openConnections |
代表與 Broker 連線的連線物件數量。 |
channelCacheSize |
目前設定的最大允許閒置通道數。 |
connectionCacheSize |
目前設定的最大允許閒置連線數。 |
idleConnections |
目前閒置的連線數。 |
idleConnectionsHighWater |
同時閒置的連線最大數量。 |
idleChannelsTx:<localPort> |
此連線目前閒置(已快取)的交易通道數。您可以使用屬性名稱的 |
idleChannelsNotTx:<localPort> |
此連線目前閒置(已快取)的非交易通道數。屬性名稱的 |
idleChannelsTxHighWater:<localPort> |
同時閒置(已快取)的交易通道最大數量。屬性名稱的 localPort 部分可用於與 RabbitMQ 管理 UI 上的連線和通道建立關聯。 |
idleChannelsNotTxHighWater:<localPort> |
同時閒置(已快取)的非交易通道最大數量。您可以使用屬性名稱的 |
也包含 cacheMode
屬性(CHANNEL
或 CONNECTION
)。

RabbitMQ 自動連線/拓撲恢復
自 Spring AMQP 的第一個版本以來,該框架已在 Broker 故障時提供自己的連線和通道恢復。此外,如 設定 Broker 中所述,當重新建立連線時,RabbitAdmin
會重新宣告任何基礎結構 Bean(佇列等)。因此,它不依賴 amqp-client
程式庫現在提供的 自動恢復。amqp-client
預設已啟用自動恢復。兩種恢復機制之間存在一些不相容性,因此,預設情況下,Spring 會將底層 RabbitMQ connectionFactory
上的 automaticRecoveryEnabled
屬性設定為 false
。即使該屬性為 true
,Spring 也會立即關閉任何已恢復的連線,從而有效地禁用它。
預設情況下,只有定義為 Bean 的元素(佇列、交換器、綁定)會在連線失敗後重新宣告。請參閱 恢復自動刪除宣告,以了解如何變更該行為。 |