設定 Broker

AMQP 規範描述了如何使用協定在 Broker 上設定佇列、交換器和繫結。這些操作(可從 0.8 規範及更高版本移植)存在於 org.springframework.amqp.core 套件中的 AmqpAdmin 介面中。該類別的 RabbitMQ 實作是位於 org.springframework.amqp.rabbit.core 套件中的 RabbitAdmin

AmqpAdmin 介面基於使用 Spring AMQP 網域抽象概念,如下清單所示

public interface AmqpAdmin {

    // Exchange Operations

    void declareExchange(Exchange exchange);

    void deleteExchange(String exchangeName);

    // Queue Operations

    Queue declareQueue();

    String declareQueue(Queue queue);

    void deleteQueue(String queueName);

    void deleteQueue(String queueName, boolean unused, boolean empty);

    void purgeQueue(String queueName, boolean noWait);

    // Binding Operations

    void declareBinding(Binding binding);

    void removeBinding(Binding binding);

    Properties getQueueProperties(String queueName);

}

另請參閱 範圍操作

getQueueProperties() 方法傳回關於佇列的一些有限資訊(訊息計數和消費者計數)。傳回的屬性的鍵作為常數在 RabbitTemplate 中可用 (QUEUE_NAMEQUEUE_MESSAGE_COUNTQUEUE_CONSUMER_COUNT)。RabbitMQ REST APIQueueInfo 物件中提供更多資訊。

無引數 declareQueue() 方法在 Broker 上定義一個具有自動產生名稱的佇列。此自動產生佇列的其他屬性為 exclusive=trueautoDelete=truedurable=false

declareQueue(Queue queue) 方法採用 Queue 物件並傳回宣告的佇列名稱。如果提供的 Queuename 屬性是空 String,則 Broker 會宣告具有產生名稱的佇列。該名稱會傳回給呼叫者。該名稱也會新增至 QueueactualName 屬性。您只能透過直接調用 RabbitAdmin 以程式化方式使用此功能。當在應用程式內容中宣告式定義佇列時使用管理員自動宣告時,您可以將 name 屬性設定為 "" (空字串)。然後 Broker 會建立名稱。從 2.1 版開始,監聽器容器可以使用此類型的佇列。有關更多資訊,請參閱 容器與 Broker 命名佇列

這與 AnonymousQueue 形成對比,其中框架產生唯一 (UUID) 名稱並將 durable 設定為 false,以及 exclusiveautoDelete 設定為 true。具有空 (或遺失) name 屬性的 <rabbit:queue/> 始終建立 AnonymousQueue

請參閱 AnonymousQueue 以了解為何 AnonymousQueue 比 Broker 產生的佇列名稱更受青睞,以及如何控制名稱的格式。從 2.1 版開始,匿名佇列在宣告時,預設情況下會將引數 Queue.X_QUEUE_LEADER_LOCATOR 設定為 client-local。這確保佇列在應用程式連線的節點上宣告。宣告式佇列必須具有固定的名稱,因為它們可能在內容中的其他地方被引用 — 例如在以下範例中顯示的監聽器中

<rabbit:listener-container>
    <rabbit:listener ref="listener" queue-names="#{someQueue.name}" />
</rabbit:listener-container>

此介面的 RabbitMQ 實作是 RabbitAdmin,當使用 Spring XML 設定時,類似於以下範例

<rabbit:connection-factory id="connectionFactory"/>

<rabbit:admin id="amqpAdmin" connection-factory="connectionFactory"/>

CachingConnectionFactory 快取模式為 CHANNEL (預設值) 時,RabbitAdmin 實作會自動延遲宣告在相同 ApplicationContext 中宣告的佇列、交換器和繫結。這些元件會在 Connection 開啟到 Broker 後立即宣告。有一些命名空間功能使這非常方便 — 例如,在 Stocks 範例應用程式中,我們有以下內容

<rabbit:queue id="tradeQueue"/>

<rabbit:queue id="marketDataQueue"/>

<fanout-exchange name="broadcast.responses"
                 xmlns="http://www.springframework.org/schema/rabbit">
    <bindings>
        <binding queue="tradeQueue"/>
    </bindings>
</fanout-exchange>

<topic-exchange name="app.stock.marketdata"
                xmlns="http://www.springframework.org/schema/rabbit">
    <bindings>
        <binding queue="marketDataQueue" pattern="${stocks.quote.pattern}"/>
    </bindings>
</topic-exchange>

在前面的範例中,我們使用匿名佇列(實際上,在內部,只是名稱由框架而非 Broker 產生的佇列)並通過 ID 引用它們。我們還可以宣告具有顯式名稱的佇列,這些名稱也充當它們在內容中 Bean 定義的識別符。以下範例設定具有顯式名稱的佇列

<rabbit:queue name="stocks.trade.queue"/>
您可以同時提供 idname 屬性。這讓您可以通過獨立於佇列名稱的 ID 來引用佇列(例如,在繫結中)。它還允許標準 Spring 功能(例如佇列名稱的屬性佔位符和 SpEL 表達式)。當您使用名稱作為 Bean 識別符時,這些功能不可用。

佇列可以使用其他引數進行設定 — 例如,x-message-ttl。當您使用命名空間支援時,它們以引數名稱/引數值對的 Map 形式提供,這些引數名稱/引數值對通過使用 <rabbit:queue-arguments> 元素定義。以下範例示範如何執行此操作

<rabbit:queue name="withArguments">
    <rabbit:queue-arguments>
        <entry key="x-dead-letter-exchange" value="myDLX"/>
        <entry key="x-dead-letter-routing-key" value="dlqRK"/>
    </rabbit:queue-arguments>
</rabbit:queue>

預設情況下,引數被假定為字串。對於其他類型的引數,您必須提供類型。以下範例示範如何指定類型

<rabbit:queue name="withArguments">
    <rabbit:queue-arguments value-type="java.lang.Long">
        <entry key="x-message-ttl" value="100"/>
    </rabbit:queue-arguments>
</rabbit:queue>

當提供混合類型的引數時,您必須為每個項目元素提供類型。以下範例示範如何執行此操作

<rabbit:queue name="withArguments">
    <rabbit:queue-arguments>
        <entry key="x-message-ttl">
            <value type="java.lang.Long">100</value>
        </entry>
        <entry key="x-dead-letter-exchange" value="myDLX"/>
        <entry key="x-dead-letter-routing-key" value="dlqRK"/>
    </rabbit:queue-arguments>
</rabbit:queue>

使用 Spring Framework 3.2 及更高版本,可以更簡潔地宣告此項,如下所示

<rabbit:queue name="withArguments">
    <rabbit:queue-arguments>
        <entry key="x-message-ttl" value="100" value-type="java.lang.Long"/>
        <entry key="x-ha-policy" value="all"/>
    </rabbit:queue-arguments>
</rabbit:queue>

當您使用 Java 設定時,Queue.X_QUEUE_LEADER_LOCATOR 引數作為第一類屬性通過 Queue 類別上的 setLeaderLocator() 方法支援。從 2.1 版開始,匿名佇列在宣告時,預設情況下會將此屬性設定為 client-local。這確保佇列在應用程式連線的節點上宣告。

RabbitMQ Broker 不允許宣告具有不匹配引數的佇列。例如,如果 queue 已經存在且沒有 time to live 引數,並且您嘗試使用(例如)key="x-message-ttl" value="100" 宣告它,則會擲回例外。

預設情況下,當任何例外發生時,RabbitAdmin 會立即停止處理所有宣告。這可能會導致下游問題,例如監聽器容器無法初始化,因為另一個佇列(在錯誤佇列之後定義)未宣告。

可以通過在 RabbitAdmin 實例上將 ignore-declaration-exceptions 屬性設定為 true 來修改此行為。此選項指示 RabbitAdmin 記錄例外並繼續宣告其他元素。當使用 Java 設定 RabbitAdmin 時,此屬性稱為 ignoreDeclarationExceptions。這是一個適用於所有元素的全域設定。佇列、交換器和繫結具有類似的屬性,該屬性僅適用於這些元素。

在 1.6 版之前,僅當通道上發生 IOException 時,此屬性才生效,例如當目前屬性與所需屬性之間存在不匹配時。現在,此屬性在任何例外情況下都生效,包括 TimeoutException 和其他例外。

此外,任何宣告例外都會導致發布 DeclarationExceptionEvent,這是一個可以被內容中的任何 ApplicationListener 消費的 ApplicationEvent。該事件包含對管理員、正在宣告的元素和 Throwable 的引用。

標頭交換器

從 1.3 版開始,您可以設定 HeadersExchange 以匹配多個標頭。您還可以指定是否必須匹配任何或所有標頭。以下範例示範如何執行此操作

<rabbit:headers-exchange name="headers-test">
    <rabbit:bindings>
        <rabbit:binding queue="bucket">
            <rabbit:binding-arguments>
                <entry key="foo" value="bar"/>
                <entry key="baz" value="qux"/>
                <entry key="x-match" value="all"/>
            </rabbit:binding-arguments>
        </rabbit:binding>
    </rabbit:bindings>
</rabbit:headers-exchange>

從 1.6 版開始,您可以使用 internal 標誌(預設為 false)設定 Exchanges,並且此類 Exchange 通過 RabbitAdmin 在 Broker 上正確設定(如果應用程式內容中存在一個)。如果交換器的 internal 標誌為 true,則 RabbitMQ 不允許用戶端使用該交換器。這對於死信交換器或交換器到交換器的繫結很有用,在這種情況下,您不希望交換器被發布者直接使用。

要查看如何使用 Java 設定 AMQP 基礎架構,請查看 Stock 範例應用程式,其中有 @Configuration 類別 AbstractStockRabbitConfiguration,而它又具有 RabbitClientConfigurationRabbitServerConfiguration 子類別。以下清單顯示了 AbstractStockRabbitConfiguration 的程式碼

@Configuration
public abstract class AbstractStockAppRabbitConfiguration {

    @Bean
    public CachingConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory =
            new CachingConnectionFactory("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        template.setMessageConverter(jsonMessageConverter());
        configureRabbitTemplate(template);
        return template;
    }

    @Bean
    public Jackson2JsonMessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public TopicExchange marketDataExchange() {
        return new TopicExchange("app.stock.marketdata");
    }

    // additional code omitted for brevity

}

在 Stock 應用程式中,伺服器通過使用以下 @Configuration 類別進行設定

@Configuration
public class RabbitServerConfiguration extends AbstractStockAppRabbitConfiguration  {

    @Bean
    public Queue stockRequestQueue() {
        return new Queue("app.stock.request");
    }
}

這是 @Configuration 類別的整個繼承鏈的末端。最終結果是 TopicExchangeQueue 在應用程式啟動時宣告給 Broker。在伺服器設定中,TopicExchange 沒有與佇列繫結,因為這是在用戶端應用程式中完成的。然而,股票請求佇列會自動繫結到 AMQP 預設交換器。此行為由規範定義。

用戶端 @Configuration 類別更有趣一些。其宣告如下

@Configuration
public class RabbitClientConfiguration extends AbstractStockAppRabbitConfiguration {

    @Value("${stocks.quote.pattern}")
    private String marketDataRoutingKey;

    @Bean
    public Queue marketDataQueue() {
        return amqpAdmin().declareQueue();
    }

    /**
     * Binds to the market data exchange.
     * Interested in any stock quotes
     * that match its routing key.
     */
    @Bean
    public Binding marketDataBinding() {
        return BindingBuilder.bind(
                marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
    }

    // additional code omitted for brevity

}

用戶端通過 AmqpAdmin 上的 declareQueue() 方法宣告另一個佇列。它將該佇列繫結到市場資料交換器,其路由模式在屬性檔案中外部化。

佇列和交換器的 Builder API

1.6 版引入了一個方便的流暢 API,用於在使用 Java 設定時設定 QueueExchange 物件。以下範例示範如何使用它

@Bean
public Queue queue() {
    return QueueBuilder.nonDurable("foo")
        .autoDelete()
        .exclusive()
        .withArgument("foo", "bar")
        .build();
}

@Bean
public Exchange exchange() {
  return ExchangeBuilder.directExchange("foo")
      .autoDelete()
      .internal()
      .withArgument("foo", "bar")
      .build();
}

從 2.0 版開始,ExchangeBuilder 現在預設情況下建立持久交換器,以與各個 AbstractExchange 類別上的簡單建構函式保持一致。要使用 Builder 建立非持久交換器,請在使用 .build() 之前使用 .durable(false)。不再提供沒有參數的 durable() 方法。

2.2 版引入了流暢 API,用於新增「眾所周知」的交換器和佇列引數…​

@Bean
public Queue allArgs1() {
    return QueueBuilder.nonDurable("all.args.1")
            .ttl(1000)
            .expires(200_000)
            .maxLength(42)
            .maxLengthBytes(10_000)
            .overflow(Overflow.rejectPublish)
            .deadLetterExchange("dlx")
            .deadLetterRoutingKey("dlrk")
            .maxPriority(4)
            .lazy()
            .leaderLocator(LeaderLocator.minLeaders)
            .singleActiveConsumer()
            .build();
}

@Bean
public DirectExchange ex() {
    return ExchangeBuilder.directExchange("ex.with.alternate")
            .durable(true)
            .alternate("alternate")
            .build();
}

宣告交換器、佇列和繫結的集合

您可以將 Declarable 物件(QueueExchangeBinding)的集合包裝在 Declarables 物件中。RabbitAdmin 在應用程式內容中偵測到此類 Bean(以及離散的 Declarable Bean),並在建立連線時(最初和連線失敗後)在 Broker 上宣告包含的物件。以下範例示範如何執行此操作

@Configuration
public static class Config {

    @Bean
    public CachingConnectionFactory cf() {
        return new CachingConnectionFactory("localhost");
    }

    @Bean
    public RabbitAdmin admin(ConnectionFactory cf) {
        return new RabbitAdmin(cf);
    }

    @Bean
    public DirectExchange e1() {
        return new DirectExchange("e1", false, true);
    }

    @Bean
    public Queue q1() {
        return new Queue("q1", false, false, true);
    }

    @Bean
    public Binding b1() {
        return BindingBuilder.bind(q1()).to(e1()).with("k1");
    }

    @Bean
    public Declarables es() {
        return new Declarables(
                new DirectExchange("e2", false, true),
                new DirectExchange("e3", false, true));
    }

    @Bean
    public Declarables qs() {
        return new Declarables(
                new Queue("q2", false, false, true),
                new Queue("q3", false, false, true));
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public Declarables prototypes() {
        return new Declarables(new Queue(this.prototypeQueueName, false, false, true));
    }

    @Bean
    public Declarables bs() {
        return new Declarables(
                new Binding("q2", DestinationType.QUEUE, "e2", "k2", null),
                new Binding("q3", DestinationType.QUEUE, "e3", "k3", null));
    }

    @Bean
    public Declarables ds() {
        return new Declarables(
                new DirectExchange("e4", false, true),
                new Queue("q4", false, false, true),
                new Binding("q4", DestinationType.QUEUE, "e4", "k4", null));
    }

}
在 2.1 之前的版本中,您可以通過定義 Collection<Declarable> 類型的 Bean 來宣告多個 Declarable 實例。在某些情況下,這可能會導致不良的副作用,因為管理員必須迭代所有 Collection<?> Bean。

2.2 版將 getDeclarablesByType 方法新增至 Declarables;例如,在宣告監聽器容器 Bean 時,可以將其用作便利方法。

public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
        Declarables mixedDeclarables, MessageListener listener) {

    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
    container.setQueues(mixedDeclarables.getDeclarablesByType(Queue.class).toArray(new Queue[0]));
    container.setMessageListener(listener);
    return container;
}

條件式宣告

預設情況下,所有佇列、交換器和繫結都由應用程式內容中的所有 RabbitAdmin 實例宣告(假設它們具有 auto-startup="true")。

從 2.1.9 版開始,RabbitAdmin 有一個新的屬性 explicitDeclarationsOnly(預設為 false);當此屬性設定為 true 時,管理員將僅宣告明確配置為由該管理員宣告的 Bean。

從 1.2 版本開始,您可以有條件地宣告這些元素。當應用程式連線到多個 Broker 並需要指定特定元素應與哪個 Broker 宣告時,這特別有用。

代表這些元素的類別實作 Declarable,它有兩個方法:shouldDeclare()getDeclaringAdmins()RabbitAdmin 使用這些方法來確定特定實例是否應實際處理其 Connection 上的宣告。

這些屬性在命名空間中作為屬性可用,如以下範例所示

<rabbit:admin id="admin1" connection-factory="CF1" />

<rabbit:admin id="admin2" connection-factory="CF2" />

<rabbit:admin id="admin3" connection-factory="CF3" explicit-declarations-only="true" />

<rabbit:queue id="declaredByAdmin1AndAdmin2Implicitly" />

<rabbit:queue id="declaredByAdmin1AndAdmin2" declared-by="admin1, admin2" />

<rabbit:queue id="declaredByAdmin1Only" declared-by="admin1" />

<rabbit:queue id="notDeclaredByAllExceptAdmin3" auto-declare="false" />

<rabbit:direct-exchange name="direct" declared-by="admin1, admin2">
    <rabbit:bindings>
        <rabbit:binding key="foo" queue="bar"/>
    </rabbit:bindings>
</rabbit:direct-exchange>
預設情況下,auto-declare 屬性為 true,並且如果未提供 declared-by(或為空),則所有 RabbitAdmin 實例都會宣告物件(只要管理員的 auto-startup 屬性為 true,預設值,並且管理員的 explicit-declarations-only 屬性為 false)。

同樣地,您可以使用基於 Java 的 @Configuration 來達到相同的效果。在以下範例中,元件由 admin1 宣告,但不由 admin2 宣告

@Bean
public RabbitAdmin admin1() {
    return new RabbitAdmin(cf1());
}

@Bean
public RabbitAdmin admin2() {
    return new RabbitAdmin(cf2());
}

@Bean
public Queue queue() {
    Queue queue = new Queue("foo");
    queue.setAdminsThatShouldDeclare(admin1());
    return queue;
}

@Bean
public Exchange exchange() {
    DirectExchange exchange = new DirectExchange("bar");
    exchange.setAdminsThatShouldDeclare(admin1());
    return exchange;
}

@Bean
public Binding binding() {
    Binding binding = new Binding("foo", DestinationType.QUEUE, exchange().getName(), "foo", null);
    binding.setAdminsThatShouldDeclare(admin1());
    return binding;
}

關於 idname 屬性的注意事項

<rabbit:queue/><rabbit:exchange/> 元素上的 name 屬性反映了 Broker 中實體的名稱。對於佇列,如果省略 name,則會建立匿名佇列(請參閱 AnonymousQueue)。

在 2.0 之前的版本中,name 也註冊為 Bean 名稱別名(類似於 <bean/> 元素上的 name)。

這導致了兩個問題

  • 它阻止了宣告具有相同名稱的佇列和交換器。

  • 如果別名包含 SpEL 表達式 (#{…​}),則無法解析別名。

從 2.0 版開始,如果您宣告其中一個元素同時具有 id *和* name 屬性,則名稱不再宣告為 Bean 名稱別名。如果您希望宣告具有相同 name 的佇列和交換器,則必須提供 id

如果元素僅具有 name 屬性,則沒有變更。Bean 仍然可以通過 name 引用 — 例如,在繫結宣告中。但是,如果名稱包含 SpEL,您仍然無法引用它 — 您必須提供 id 以供參考。

AnonymousQueue

一般而言,當您需要唯一命名、獨佔、自動刪除的佇列時,我們建議您使用 AnonymousQueue 而不是 Broker 定義的佇列名稱(使用 "" 作為 Queue 名稱會導致 Broker 產生佇列名稱)。

這是因為

  1. 佇列實際上是在建立與 Broker 的連線時宣告的。這遠在 Bean 建立並連接在一起之後。使用佇列的 Bean 需要知道其名稱。事實上,當應用程式啟動時,Broker 甚至可能沒有運行。

  2. 如果與 Broker 的連線因某種原因遺失,管理員會使用相同的名稱重新宣告 AnonymousQueue。如果我們使用 Broker 宣告的佇列,佇列名稱會變更。

您可以控制 AnonymousQueue 實例使用的佇列名稱格式。

預設情況下,佇列名稱以 spring.gen- 為前綴,後跟 UUID 的 base64 表示形式 — 例如:spring.gen-MRBv9sqISkuCiPfOYfpo4g

您可以在建構函式引數中提供 AnonymousQueue.NamingStrategy 實作。以下範例示範如何執行此操作

@Bean
public Queue anon1() {
    return new AnonymousQueue();
}

@Bean
public Queue anon2() {
    return new AnonymousQueue(new AnonymousQueue.Base64UrlNamingStrategy("something-"));
}

@Bean
public Queue anon3() {
    return new AnonymousQueue(AnonymousQueue.UUIDNamingStrategy.DEFAULT);
}

第一個 Bean 產生一個以 spring.gen- 為前綴的佇列名稱,後跟 UUID 的 base64 表示形式 — 例如:spring.gen-MRBv9sqISkuCiPfOYfpo4g。第二個 Bean 產生一個以 something- 為前綴的佇列名稱,後跟 UUID 的 base64 表示形式。第三個 Bean 僅使用 UUID(沒有 base64 轉換)產生一個名稱 — 例如,f20c818a-006b-4416-bf91-643590fedb0e

base64 編碼使用 RFC 4648 中的「URL 和檔案名稱安全字母表」。尾隨填充字元 (=) 會被移除。

您可以提供自己的命名策略,您可以藉此在佇列名稱中包含其他資訊(例如應用程式名稱或用戶端主機)。

當您使用 XML 設定時,可以指定命名策略。naming-strategy 屬性存在於 <rabbit:queue> 元素上,用於實作 AnonymousQueue.NamingStrategy 的 Bean 參考。以下範例示範如何以各種方式指定命名策略

<rabbit:queue id="uuidAnon" />

<rabbit:queue id="springAnon" naming-strategy="uuidNamer" />

<rabbit:queue id="customAnon" naming-strategy="customNamer" />

<bean id="uuidNamer" class="org.springframework.amqp.core.AnonymousQueue.UUIDNamingStrategy" />

<bean id="customNamer" class="org.springframework.amqp.core.AnonymousQueue.Base64UrlNamingStrategy">
    <constructor-arg value="custom.gen-" />
</bean>

第一個範例建立諸如 spring.gen-MRBv9sqISkuCiPfOYfpo4g 之類的名稱。第二個範例建立具有 UUID 字串表示形式的名稱。第三個範例建立諸如 custom.gen-MRBv9sqISkuCiPfOYfpo4g 之類的名稱。

您還可以提供自己的命名策略 Bean。

從 2.1 版開始,匿名佇列在宣告時,預設情況下會將引數 Queue.X_QUEUE_LEADER_LOCATOR 設定為 client-local。這確保佇列在應用程式連線的節點上宣告。您可以在建構實例後通過調用 queue.setLeaderLocator(null) 來恢復到先前的行為。

恢復自動刪除宣告

通常,RabbitAdmin 僅恢復在應用程式內容中宣告為 Bean 的佇列/交換器/繫結;如果任何此類宣告是自動刪除的,則如果連線遺失,它們將被 Broker 移除。當重新建立連線時,管理員將重新宣告實體。通常,通過調用 admin.declareQueue(…​)admin.declareExchange(…​)admin.declareBinding(…​) 建立的實體將不會被恢復。

從 2.4 版開始,管理員有一個新的屬性 redeclareManualDeclarations;當為 true 時,管理員除了恢復應用程式內容中的 Bean 外,還將恢復這些實體。

如果調用 deleteQueue(…​)deleteExchange(…​)removeBinding(…​),則不會執行個別宣告的恢復。當佇列和交換器被刪除時,相關的繫結會從可恢復的實體中移除。

最後,調用 resetAllManualDeclarations() 將阻止恢復任何先前宣告的實體。