設定 Broker
AMQP 規範描述了如何使用協定在 Broker 上設定佇列、交換器和繫結。這些操作(可從 0.8 規範及更高版本移植)存在於 org.springframework.amqp.core
套件中的 AmqpAdmin
介面中。該類別的 RabbitMQ 實作是位於 org.springframework.amqp.rabbit.core
套件中的 RabbitAdmin
。
AmqpAdmin
介面基於使用 Spring AMQP 網域抽象概念,如下清單所示
public interface AmqpAdmin {
// Exchange Operations
void declareExchange(Exchange exchange);
void deleteExchange(String exchangeName);
// Queue Operations
Queue declareQueue();
String declareQueue(Queue queue);
void deleteQueue(String queueName);
void deleteQueue(String queueName, boolean unused, boolean empty);
void purgeQueue(String queueName, boolean noWait);
// Binding Operations
void declareBinding(Binding binding);
void removeBinding(Binding binding);
Properties getQueueProperties(String queueName);
}
另請參閱 範圍操作。
getQueueProperties()
方法傳回關於佇列的一些有限資訊(訊息計數和消費者計數)。傳回的屬性的鍵作為常數在 RabbitTemplate
中可用 (QUEUE_NAME
、QUEUE_MESSAGE_COUNT
和 QUEUE_CONSUMER_COUNT
)。RabbitMQ REST API 在 QueueInfo
物件中提供更多資訊。
無引數 declareQueue()
方法在 Broker 上定義一個具有自動產生名稱的佇列。此自動產生佇列的其他屬性為 exclusive=true
、autoDelete=true
和 durable=false
。
declareQueue(Queue queue)
方法採用 Queue
物件並傳回宣告的佇列名稱。如果提供的 Queue
的 name
屬性是空 String
,則 Broker 會宣告具有產生名稱的佇列。該名稱會傳回給呼叫者。該名稱也會新增至 Queue
的 actualName
屬性。您只能透過直接調用 RabbitAdmin
以程式化方式使用此功能。當在應用程式內容中宣告式定義佇列時使用管理員自動宣告時,您可以將 name 屬性設定為 ""
(空字串)。然後 Broker 會建立名稱。從 2.1 版開始,監聽器容器可以使用此類型的佇列。有關更多資訊,請參閱 容器與 Broker 命名佇列。
這與 AnonymousQueue
形成對比,其中框架產生唯一 (UUID
) 名稱並將 durable
設定為 false
,以及 exclusive
、autoDelete
設定為 true
。具有空 (或遺失) name
屬性的 <rabbit:queue/>
始終建立 AnonymousQueue
。
請參閱 AnonymousQueue
以了解為何 AnonymousQueue
比 Broker 產生的佇列名稱更受青睞,以及如何控制名稱的格式。從 2.1 版開始,匿名佇列在宣告時,預設情況下會將引數 Queue.X_QUEUE_LEADER_LOCATOR
設定為 client-local
。這確保佇列在應用程式連線的節點上宣告。宣告式佇列必須具有固定的名稱,因為它們可能在內容中的其他地方被引用 — 例如在以下範例中顯示的監聽器中
<rabbit:listener-container>
<rabbit:listener ref="listener" queue-names="#{someQueue.name}" />
</rabbit:listener-container>
請參閱 交換器、佇列和繫結的自動宣告。
此介面的 RabbitMQ 實作是 RabbitAdmin
,當使用 Spring XML 設定時,類似於以下範例
<rabbit:connection-factory id="connectionFactory"/>
<rabbit:admin id="amqpAdmin" connection-factory="connectionFactory"/>
當 CachingConnectionFactory
快取模式為 CHANNEL
(預設值) 時,RabbitAdmin
實作會自動延遲宣告在相同 ApplicationContext
中宣告的佇列、交換器和繫結。這些元件會在 Connection
開啟到 Broker 後立即宣告。有一些命名空間功能使這非常方便 — 例如,在 Stocks 範例應用程式中,我們有以下內容
<rabbit:queue id="tradeQueue"/>
<rabbit:queue id="marketDataQueue"/>
<fanout-exchange name="broadcast.responses"
xmlns="http://www.springframework.org/schema/rabbit">
<bindings>
<binding queue="tradeQueue"/>
</bindings>
</fanout-exchange>
<topic-exchange name="app.stock.marketdata"
xmlns="http://www.springframework.org/schema/rabbit">
<bindings>
<binding queue="marketDataQueue" pattern="${stocks.quote.pattern}"/>
</bindings>
</topic-exchange>
在前面的範例中,我們使用匿名佇列(實際上,在內部,只是名稱由框架而非 Broker 產生的佇列)並通過 ID 引用它們。我們還可以宣告具有顯式名稱的佇列,這些名稱也充當它們在內容中 Bean 定義的識別符。以下範例設定具有顯式名稱的佇列
<rabbit:queue name="stocks.trade.queue"/>
您可以同時提供 id 和 name 屬性。這讓您可以通過獨立於佇列名稱的 ID 來引用佇列(例如,在繫結中)。它還允許標準 Spring 功能(例如佇列名稱的屬性佔位符和 SpEL 表達式)。當您使用名稱作為 Bean 識別符時,這些功能不可用。 |
佇列可以使用其他引數進行設定 — 例如,x-message-ttl
。當您使用命名空間支援時,它們以引數名稱/引數值對的 Map
形式提供,這些引數名稱/引數值對通過使用 <rabbit:queue-arguments>
元素定義。以下範例示範如何執行此操作
<rabbit:queue name="withArguments">
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="myDLX"/>
<entry key="x-dead-letter-routing-key" value="dlqRK"/>
</rabbit:queue-arguments>
</rabbit:queue>
預設情況下,引數被假定為字串。對於其他類型的引數,您必須提供類型。以下範例示範如何指定類型
<rabbit:queue name="withArguments">
<rabbit:queue-arguments value-type="java.lang.Long">
<entry key="x-message-ttl" value="100"/>
</rabbit:queue-arguments>
</rabbit:queue>
當提供混合類型的引數時,您必須為每個項目元素提供類型。以下範例示範如何執行此操作
<rabbit:queue name="withArguments">
<rabbit:queue-arguments>
<entry key="x-message-ttl">
<value type="java.lang.Long">100</value>
</entry>
<entry key="x-dead-letter-exchange" value="myDLX"/>
<entry key="x-dead-letter-routing-key" value="dlqRK"/>
</rabbit:queue-arguments>
</rabbit:queue>
使用 Spring Framework 3.2 及更高版本,可以更簡潔地宣告此項,如下所示
<rabbit:queue name="withArguments">
<rabbit:queue-arguments>
<entry key="x-message-ttl" value="100" value-type="java.lang.Long"/>
<entry key="x-ha-policy" value="all"/>
</rabbit:queue-arguments>
</rabbit:queue>
當您使用 Java 設定時,Queue.X_QUEUE_LEADER_LOCATOR
引數作為第一類屬性通過 Queue
類別上的 setLeaderLocator()
方法支援。從 2.1 版開始,匿名佇列在宣告時,預設情況下會將此屬性設定為 client-local
。這確保佇列在應用程式連線的節點上宣告。
RabbitMQ Broker 不允許宣告具有不匹配引數的佇列。例如,如果 queue 已經存在且沒有 time to live 引數,並且您嘗試使用(例如)key="x-message-ttl" value="100" 宣告它,則會擲回例外。 |
預設情況下,當任何例外發生時,RabbitAdmin
會立即停止處理所有宣告。這可能會導致下游問題,例如監聽器容器無法初始化,因為另一個佇列(在錯誤佇列之後定義)未宣告。
可以通過在 RabbitAdmin
實例上將 ignore-declaration-exceptions
屬性設定為 true
來修改此行為。此選項指示 RabbitAdmin
記錄例外並繼續宣告其他元素。當使用 Java 設定 RabbitAdmin
時,此屬性稱為 ignoreDeclarationExceptions
。這是一個適用於所有元素的全域設定。佇列、交換器和繫結具有類似的屬性,該屬性僅適用於這些元素。
在 1.6 版之前,僅當通道上發生 IOException
時,此屬性才生效,例如當目前屬性與所需屬性之間存在不匹配時。現在,此屬性在任何例外情況下都生效,包括 TimeoutException
和其他例外。
此外,任何宣告例外都會導致發布 DeclarationExceptionEvent
,這是一個可以被內容中的任何 ApplicationListener
消費的 ApplicationEvent
。該事件包含對管理員、正在宣告的元素和 Throwable
的引用。
標頭交換器
從 1.3 版開始,您可以設定 HeadersExchange
以匹配多個標頭。您還可以指定是否必須匹配任何或所有標頭。以下範例示範如何執行此操作
<rabbit:headers-exchange name="headers-test">
<rabbit:bindings>
<rabbit:binding queue="bucket">
<rabbit:binding-arguments>
<entry key="foo" value="bar"/>
<entry key="baz" value="qux"/>
<entry key="x-match" value="all"/>
</rabbit:binding-arguments>
</rabbit:binding>
</rabbit:bindings>
</rabbit:headers-exchange>
從 1.6 版開始,您可以使用 internal
標誌(預設為 false
)設定 Exchanges
,並且此類 Exchange
通過 RabbitAdmin
在 Broker 上正確設定(如果應用程式內容中存在一個)。如果交換器的 internal
標誌為 true
,則 RabbitMQ 不允許用戶端使用該交換器。這對於死信交換器或交換器到交換器的繫結很有用,在這種情況下,您不希望交換器被發布者直接使用。
要查看如何使用 Java 設定 AMQP 基礎架構,請查看 Stock 範例應用程式,其中有 @Configuration
類別 AbstractStockRabbitConfiguration
,而它又具有 RabbitClientConfiguration
和 RabbitServerConfiguration
子類別。以下清單顯示了 AbstractStockRabbitConfiguration
的程式碼
@Configuration
public abstract class AbstractStockAppRabbitConfiguration {
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setMessageConverter(jsonMessageConverter());
configureRabbitTemplate(template);
return template;
}
@Bean
public Jackson2JsonMessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public TopicExchange marketDataExchange() {
return new TopicExchange("app.stock.marketdata");
}
// additional code omitted for brevity
}
在 Stock 應用程式中,伺服器通過使用以下 @Configuration
類別進行設定
@Configuration
public class RabbitServerConfiguration extends AbstractStockAppRabbitConfiguration {
@Bean
public Queue stockRequestQueue() {
return new Queue("app.stock.request");
}
}
這是 @Configuration
類別的整個繼承鏈的末端。最終結果是 TopicExchange
和 Queue
在應用程式啟動時宣告給 Broker。在伺服器設定中,TopicExchange
沒有與佇列繫結,因為這是在用戶端應用程式中完成的。然而,股票請求佇列會自動繫結到 AMQP 預設交換器。此行為由規範定義。
用戶端 @Configuration
類別更有趣一些。其宣告如下
@Configuration
public class RabbitClientConfiguration extends AbstractStockAppRabbitConfiguration {
@Value("${stocks.quote.pattern}")
private String marketDataRoutingKey;
@Bean
public Queue marketDataQueue() {
return amqpAdmin().declareQueue();
}
/**
* Binds to the market data exchange.
* Interested in any stock quotes
* that match its routing key.
*/
@Bean
public Binding marketDataBinding() {
return BindingBuilder.bind(
marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
}
// additional code omitted for brevity
}
用戶端通過 AmqpAdmin
上的 declareQueue()
方法宣告另一個佇列。它將該佇列繫結到市場資料交換器,其路由模式在屬性檔案中外部化。
佇列和交換器的 Builder API
1.6 版引入了一個方便的流暢 API,用於在使用 Java 設定時設定 Queue
和 Exchange
物件。以下範例示範如何使用它
@Bean
public Queue queue() {
return QueueBuilder.nonDurable("foo")
.autoDelete()
.exclusive()
.withArgument("foo", "bar")
.build();
}
@Bean
public Exchange exchange() {
return ExchangeBuilder.directExchange("foo")
.autoDelete()
.internal()
.withArgument("foo", "bar")
.build();
}
有關更多資訊,請參閱 org.springframework.amqp.core.QueueBuilder
和 org.springframework.amqp.core.ExchangeBuilder
的 Javadoc。
從 2.0 版開始,ExchangeBuilder
現在預設情況下建立持久交換器,以與各個 AbstractExchange
類別上的簡單建構函式保持一致。要使用 Builder 建立非持久交換器,請在使用 .build()
之前使用 .durable(false)
。不再提供沒有參數的 durable()
方法。
2.2 版引入了流暢 API,用於新增「眾所周知」的交換器和佇列引數…
@Bean
public Queue allArgs1() {
return QueueBuilder.nonDurable("all.args.1")
.ttl(1000)
.expires(200_000)
.maxLength(42)
.maxLengthBytes(10_000)
.overflow(Overflow.rejectPublish)
.deadLetterExchange("dlx")
.deadLetterRoutingKey("dlrk")
.maxPriority(4)
.lazy()
.leaderLocator(LeaderLocator.minLeaders)
.singleActiveConsumer()
.build();
}
@Bean
public DirectExchange ex() {
return ExchangeBuilder.directExchange("ex.with.alternate")
.durable(true)
.alternate("alternate")
.build();
}
宣告交換器、佇列和繫結的集合
您可以將 Declarable
物件(Queue
、Exchange
和 Binding
)的集合包裝在 Declarables
物件中。RabbitAdmin
在應用程式內容中偵測到此類 Bean(以及離散的 Declarable
Bean),並在建立連線時(最初和連線失敗後)在 Broker 上宣告包含的物件。以下範例示範如何執行此操作
@Configuration
public static class Config {
@Bean
public CachingConnectionFactory cf() {
return new CachingConnectionFactory("localhost");
}
@Bean
public RabbitAdmin admin(ConnectionFactory cf) {
return new RabbitAdmin(cf);
}
@Bean
public DirectExchange e1() {
return new DirectExchange("e1", false, true);
}
@Bean
public Queue q1() {
return new Queue("q1", false, false, true);
}
@Bean
public Binding b1() {
return BindingBuilder.bind(q1()).to(e1()).with("k1");
}
@Bean
public Declarables es() {
return new Declarables(
new DirectExchange("e2", false, true),
new DirectExchange("e3", false, true));
}
@Bean
public Declarables qs() {
return new Declarables(
new Queue("q2", false, false, true),
new Queue("q3", false, false, true));
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public Declarables prototypes() {
return new Declarables(new Queue(this.prototypeQueueName, false, false, true));
}
@Bean
public Declarables bs() {
return new Declarables(
new Binding("q2", DestinationType.QUEUE, "e2", "k2", null),
new Binding("q3", DestinationType.QUEUE, "e3", "k3", null));
}
@Bean
public Declarables ds() {
return new Declarables(
new DirectExchange("e4", false, true),
new Queue("q4", false, false, true),
new Binding("q4", DestinationType.QUEUE, "e4", "k4", null));
}
}
在 2.1 之前的版本中,您可以通過定義 Collection<Declarable> 類型的 Bean 來宣告多個 Declarable 實例。在某些情況下,這可能會導致不良的副作用,因為管理員必須迭代所有 Collection<?> Bean。 |
2.2 版將 getDeclarablesByType
方法新增至 Declarables
;例如,在宣告監聽器容器 Bean 時,可以將其用作便利方法。
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
Declarables mixedDeclarables, MessageListener listener) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(mixedDeclarables.getDeclarablesByType(Queue.class).toArray(new Queue[0]));
container.setMessageListener(listener);
return container;
}
條件式宣告
預設情況下,所有佇列、交換器和繫結都由應用程式內容中的所有 RabbitAdmin
實例宣告(假設它們具有 auto-startup="true"
)。
從 2.1.9 版開始,RabbitAdmin
有一個新的屬性 explicitDeclarationsOnly
(預設為 false
);當此屬性設定為 true
時,管理員將僅宣告明確配置為由該管理員宣告的 Bean。
從 1.2 版本開始,您可以有條件地宣告這些元素。當應用程式連線到多個 Broker 並需要指定特定元素應與哪個 Broker 宣告時,這特別有用。 |
代表這些元素的類別實作 Declarable
,它有兩個方法:shouldDeclare()
和 getDeclaringAdmins()
。RabbitAdmin
使用這些方法來確定特定實例是否應實際處理其 Connection
上的宣告。
這些屬性在命名空間中作為屬性可用,如以下範例所示
<rabbit:admin id="admin1" connection-factory="CF1" />
<rabbit:admin id="admin2" connection-factory="CF2" />
<rabbit:admin id="admin3" connection-factory="CF3" explicit-declarations-only="true" />
<rabbit:queue id="declaredByAdmin1AndAdmin2Implicitly" />
<rabbit:queue id="declaredByAdmin1AndAdmin2" declared-by="admin1, admin2" />
<rabbit:queue id="declaredByAdmin1Only" declared-by="admin1" />
<rabbit:queue id="notDeclaredByAllExceptAdmin3" auto-declare="false" />
<rabbit:direct-exchange name="direct" declared-by="admin1, admin2">
<rabbit:bindings>
<rabbit:binding key="foo" queue="bar"/>
</rabbit:bindings>
</rabbit:direct-exchange>
預設情況下,auto-declare 屬性為 true ,並且如果未提供 declared-by (或為空),則所有 RabbitAdmin 實例都會宣告物件(只要管理員的 auto-startup 屬性為 true ,預設值,並且管理員的 explicit-declarations-only 屬性為 false)。 |
同樣地,您可以使用基於 Java 的 @Configuration
來達到相同的效果。在以下範例中,元件由 admin1
宣告,但不由 admin2
宣告
@Bean
public RabbitAdmin admin1() {
return new RabbitAdmin(cf1());
}
@Bean
public RabbitAdmin admin2() {
return new RabbitAdmin(cf2());
}
@Bean
public Queue queue() {
Queue queue = new Queue("foo");
queue.setAdminsThatShouldDeclare(admin1());
return queue;
}
@Bean
public Exchange exchange() {
DirectExchange exchange = new DirectExchange("bar");
exchange.setAdminsThatShouldDeclare(admin1());
return exchange;
}
@Bean
public Binding binding() {
Binding binding = new Binding("foo", DestinationType.QUEUE, exchange().getName(), "foo", null);
binding.setAdminsThatShouldDeclare(admin1());
return binding;
}
關於 id
和 name
屬性的注意事項
<rabbit:queue/>
和 <rabbit:exchange/>
元素上的 name
屬性反映了 Broker 中實體的名稱。對於佇列,如果省略 name
,則會建立匿名佇列(請參閱 AnonymousQueue
)。
在 2.0 之前的版本中,name
也註冊為 Bean 名稱別名(類似於 <bean/>
元素上的 name
)。
這導致了兩個問題
-
它阻止了宣告具有相同名稱的佇列和交換器。
-
如果別名包含 SpEL 表達式 (
#{…}
),則無法解析別名。
從 2.0 版開始,如果您宣告其中一個元素同時具有 id
*和* name
屬性,則名稱不再宣告為 Bean 名稱別名。如果您希望宣告具有相同 name
的佇列和交換器,則必須提供 id
。
如果元素僅具有 name
屬性,則沒有變更。Bean 仍然可以通過 name
引用 — 例如,在繫結宣告中。但是,如果名稱包含 SpEL,您仍然無法引用它 — 您必須提供 id
以供參考。
AnonymousQueue
一般而言,當您需要唯一命名、獨佔、自動刪除的佇列時,我們建議您使用 AnonymousQueue
而不是 Broker 定義的佇列名稱(使用 ""
作為 Queue
名稱會導致 Broker 產生佇列名稱)。
這是因為
-
佇列實際上是在建立與 Broker 的連線時宣告的。這遠在 Bean 建立並連接在一起之後。使用佇列的 Bean 需要知道其名稱。事實上,當應用程式啟動時,Broker 甚至可能沒有運行。
-
如果與 Broker 的連線因某種原因遺失,管理員會使用相同的名稱重新宣告
AnonymousQueue
。如果我們使用 Broker 宣告的佇列,佇列名稱會變更。
您可以控制 AnonymousQueue
實例使用的佇列名稱格式。
預設情況下,佇列名稱以 spring.gen-
為前綴,後跟 UUID
的 base64 表示形式 — 例如:spring.gen-MRBv9sqISkuCiPfOYfpo4g
。
您可以在建構函式引數中提供 AnonymousQueue.NamingStrategy
實作。以下範例示範如何執行此操作
@Bean
public Queue anon1() {
return new AnonymousQueue();
}
@Bean
public Queue anon2() {
return new AnonymousQueue(new AnonymousQueue.Base64UrlNamingStrategy("something-"));
}
@Bean
public Queue anon3() {
return new AnonymousQueue(AnonymousQueue.UUIDNamingStrategy.DEFAULT);
}
第一個 Bean 產生一個以 spring.gen-
為前綴的佇列名稱,後跟 UUID
的 base64 表示形式 — 例如:spring.gen-MRBv9sqISkuCiPfOYfpo4g
。第二個 Bean 產生一個以 something-
為前綴的佇列名稱,後跟 UUID
的 base64 表示形式。第三個 Bean 僅使用 UUID(沒有 base64 轉換)產生一個名稱 — 例如,f20c818a-006b-4416-bf91-643590fedb0e
。
base64 編碼使用 RFC 4648 中的「URL 和檔案名稱安全字母表」。尾隨填充字元 (=
) 會被移除。
您可以提供自己的命名策略,您可以藉此在佇列名稱中包含其他資訊(例如應用程式名稱或用戶端主機)。
當您使用 XML 設定時,可以指定命名策略。naming-strategy
屬性存在於 <rabbit:queue>
元素上,用於實作 AnonymousQueue.NamingStrategy
的 Bean 參考。以下範例示範如何以各種方式指定命名策略
<rabbit:queue id="uuidAnon" />
<rabbit:queue id="springAnon" naming-strategy="uuidNamer" />
<rabbit:queue id="customAnon" naming-strategy="customNamer" />
<bean id="uuidNamer" class="org.springframework.amqp.core.AnonymousQueue.UUIDNamingStrategy" />
<bean id="customNamer" class="org.springframework.amqp.core.AnonymousQueue.Base64UrlNamingStrategy">
<constructor-arg value="custom.gen-" />
</bean>
第一個範例建立諸如 spring.gen-MRBv9sqISkuCiPfOYfpo4g
之類的名稱。第二個範例建立具有 UUID 字串表示形式的名稱。第三個範例建立諸如 custom.gen-MRBv9sqISkuCiPfOYfpo4g
之類的名稱。
您還可以提供自己的命名策略 Bean。
從 2.1 版開始,匿名佇列在宣告時,預設情況下會將引數 Queue.X_QUEUE_LEADER_LOCATOR
設定為 client-local
。這確保佇列在應用程式連線的節點上宣告。您可以在建構實例後通過調用 queue.setLeaderLocator(null)
來恢復到先前的行為。
恢復自動刪除宣告
通常,RabbitAdmin
僅恢復在應用程式內容中宣告為 Bean 的佇列/交換器/繫結;如果任何此類宣告是自動刪除的,則如果連線遺失,它們將被 Broker 移除。當重新建立連線時,管理員將重新宣告實體。通常,通過調用 admin.declareQueue(…)
、admin.declareExchange(…)
和 admin.declareBinding(…)
建立的實體將不會被恢復。
從 2.4 版開始,管理員有一個新的屬性 redeclareManualDeclarations
;當為 true
時,管理員除了恢復應用程式內容中的 Bean 外,還將恢復這些實體。
如果調用 deleteQueue(…)
、deleteExchange(…)
或 removeBinding(…)
,則不會執行個別宣告的恢復。當佇列和交換器被刪除時,相關的繫結會從可恢復的實體中移除。
最後,調用 resetAllManualDeclarations()
將阻止恢復任何先前宣告的實體。