發送訊息

當發送訊息時,您可以使用以下任何方法

void send(Message message) throws AmqpException;

void send(String routingKey, Message message) throws AmqpException;

void send(String exchange, String routingKey, Message message) throws AmqpException;

我們可以從前面清單中的最後一個方法開始討論,因為它實際上是最明確的。它允許在執行時提供 AMQP 交換器名稱(以及路由金鑰)。最後一個參數是負責實際建立訊息實例的回呼。以下範例顯示如何使用此方法來發送訊息:以下範例顯示如何使用 send 方法來發送訊息

amqpTemplate.send("marketData.topic", "quotes.nasdaq.THING1",
    new Message("12.34".getBytes(), someProperties));

如果您計劃使用該範本實例來發送到相同交換器的大部分或所有時間,則可以在範本本身上設定 exchange 屬性。在這種情況下,您可以使用前面清單中的第二個方法。以下範例在功能上與先前的範例等效

amqpTemplate.setExchange("marketData.topic");
amqpTemplate.send("quotes.nasdaq.FOO", new Message("12.34".getBytes(), someProperties));

如果 exchangeroutingKey 屬性都設定在範本上,則可以使用僅接受 Message 的方法。以下範例顯示如何執行此操作

amqpTemplate.setExchange("marketData.topic");
amqpTemplate.setRoutingKey("quotes.nasdaq.FOO");
amqpTemplate.send(new Message("12.34".getBytes(), someProperties));

關於交換器和路由金鑰屬性,更好的思考方式是,明確的方法參數始終覆寫範本的預設值。實際上,即使您沒有在範本上明確設定這些屬性,也始終存在預設值。在這兩種情況下,預設值都是空 String,但這實際上是一個合理的預設值。就路由金鑰而言,它並非總是必要的(例如,對於 Fanout 交換器)。此外,佇列可以繫結到具有空 String 的交換器。這些都是依賴範本的路由金鑰屬性的預設空 String 值的合理情境。就交換器名稱而言,空 String 通常使用,因為 AMQP 規範將「預設交換器」定義為沒有名稱。由於所有佇列都會自動繫結到該預設交換器(它是直接交換器),因此將其名稱用作繫結值,因此可以使用前面清單中的第二種方法,透過預設交換器對任何佇列進行簡單的點對點訊息傳遞。您可以透過在執行時提供方法參數來提供佇列名稱作為 routingKey。以下範例顯示如何執行此操作

RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.send("queue.helloWorld", new Message("Hello World".getBytes(), someProperties));

或者,您可以建立一個範本,該範本可用於主要或專門發佈到單一佇列。以下範例顯示如何執行此操作

RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.setRoutingKey("queue.helloWorld"); // but we'll always send to this Queue
template.send(new Message("Hello World".getBytes(), someProperties));

訊息建構器 API

從 1.3 版開始,MessageBuilderMessagePropertiesBuilder 提供了訊息建構器 API。這些方法提供了建立訊息或訊息屬性的便捷「流暢」方式。以下範例顯示了實際運作的流暢 API

Message message = MessageBuilder.withBody("foo".getBytes())
    .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
    .setMessageId("123")
    .setHeader("bar", "baz")
    .build();
MessageProperties props = MessagePropertiesBuilder.newInstance()
    .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
    .setMessageId("123")
    .setHeader("bar", "baz")
    .build();
Message message = MessageBuilder.withBody("foo".getBytes())
    .andProperties(props)
    .build();

可以設定在 MessageProperties 上定義的每個屬性。其他方法包括 setHeader(String key, String value)removeHeader(String key)removeHeaders()copyProperties(MessageProperties properties)。每個屬性設定方法都有一個 set*IfAbsent() 變體。在存在預設初始值的情況下,該方法命名為 set*IfAbsentOrDefault()

提供了五個靜態方法來建立初始訊息建構器

public static MessageBuilder withBody(byte[] body) (1)

public static MessageBuilder withClonedBody(byte[] body) (2)

public static MessageBuilder withBody(byte[] body, int from, int to) (3)

public static MessageBuilder fromMessage(Message message) (4)

public static MessageBuilder fromClonedMessage(Message message) (5)
1 建構器建立的訊息具有一個主體,該主體是對引數的直接參考。
2 建構器建立的訊息具有一個主體,該主體是一個新的陣列,其中包含引數中位元組的副本。
3 建構器建立的訊息具有一個主體,該主體是一個新的陣列,其中包含引數中的位元組範圍。如需更多詳細資訊,請參閱 Arrays.copyOfRange()
4 建構器建立的訊息具有一個主體,該主體是對引數主體的直接參考。引數的屬性會複製到新的 MessageProperties 物件。
5 建構器建立的訊息具有一個主體,該主體是一個新的陣列,其中包含引數主體的副本。引數的屬性會複製到新的 MessageProperties 物件。

提供了三個靜態方法來建立 MessagePropertiesBuilder 實例

public static MessagePropertiesBuilder newInstance() (1)

public static MessagePropertiesBuilder fromProperties(MessageProperties properties) (2)

public static MessagePropertiesBuilder fromClonedProperties(MessageProperties properties) (3)
1 新的訊息屬性物件會使用預設值初始化。
2 建構器會使用提供的屬性物件初始化,而 build() 將傳回該物件。
3 引數的屬性會複製到新的 MessageProperties 物件。

使用 AmqpTemplateRabbitTemplate 實作,每個 send() 方法都有一個多載版本,該版本採用額外的 CorrelationData 物件。當啟用發佈者確認時,此物件會在 AmqpTemplate 中描述的回呼中傳回。這讓發送者可以將確認 (acknack) 與已發送的訊息相關聯。

從 1.6.7 版開始,引入了 CorrelationAwareMessagePostProcessor 介面,允許在訊息轉換後修改關聯資料。以下範例顯示如何使用它

Message postProcessMessage(Message message, Correlation correlation);

在 2.0 版中,此介面已棄用。該方法已移至 MessagePostProcessor,並具有委派給 postProcessMessage(Message message) 的預設實作。

同樣從 1.6.7 版開始,提供了一個名為 CorrelationDataPostProcessor 的新回呼介面。這會在所有 MessagePostProcessor 實例(在 send() 方法中提供以及在 setBeforePublishPostProcessors() 中提供的實例)之後調用。實作可以更新或取代 send() 方法中提供的關聯資料(如果有的話)。Message 和原始 CorrelationData(如果有的話)作為引數提供。以下範例顯示如何使用 postProcess 方法

CorrelationData postProcess(Message message, CorrelationData correlationData);

發佈者回傳

當範本的 mandatory 屬性為 true 時,傳回的訊息由 AmqpTemplate 中描述的回呼提供。

從 1.4 版開始,RabbitTemplate 支援 SpEL mandatoryExpression 屬性,該屬性會針對每個請求訊息評估為根評估物件,解析為 boolean 值。Bean 參考,例如 @myBean.isMandatory(#root),可以在運算式中使用。

發佈者回傳也可以由 RabbitTemplate 在發送和接收操作中內部使用。如需更多資訊,請參閱 回覆逾時

批次處理

1.4.2 版引入了 BatchingRabbitTemplate。這是 RabbitTemplate 的子類別,具有覆寫的 send 方法,該方法根據 BatchingStrategy 批次處理訊息。僅當批次完成時,才會將訊息發送到 RabbitMQ。以下清單顯示了 BatchingStrategy 介面定義

public interface BatchingStrategy {

    MessageBatch addToBatch(String exchange, String routingKey, Message message);

    Date nextRelease();

    Collection<MessageBatch> releaseBatches();

}
批次資料保存在記憶體中。未發送的訊息可能會在系統故障時遺失。

提供了 SimpleBatchingStrategy。它支援將訊息發送到單一交換器或路由金鑰。它具有以下屬性

  • batchSize:在發送批次之前,批次中的訊息數量。

  • bufferLimit:批次訊息的最大大小。如果超過此值,它會搶先 batchSize,並導致發送部分批次。

  • timeout:在沒有新活動將訊息新增至批次時,發送部分批次的時間之後的時間。

SimpleBatchingStrategy 透過在每個嵌入式訊息前面加上四位元組二進位長度來格式化批次。這透過將 springBatchFormat 訊息屬性設定為 lengthHeader4 來傳達給接收系統。

預設情況下,批次訊息會由監聽器容器自動取消批次處理(透過使用 springBatchFormat 訊息標頭)。拒絕批次中的任何訊息都會導致整個批次被拒絕。

但是,如需更多資訊,請參閱 具有批次處理的 @RabbitListener