AmqpTemplate

如同 Spring Framework 和相關專案提供的許多其他高階抽象概念一樣,Spring AMQP 提供了一個扮演核心角色的「範本」。定義主要操作的介面稱為 AmqpTemplate。這些操作涵蓋了傳送和接收訊息的一般行為。換句話說,它們並非任何實作獨有 — 因此名稱中包含「AMQP」。另一方面,該介面的一些實作與 AMQP 通訊協定的實作相關聯。與 JMS(本身就是介面層級 API)不同,AMQP 是一種線路層級通訊協定。該通訊協定的實作提供了它們自己的用戶端程式庫,因此範本介面的每個實作都依賴於特定的用戶端程式庫。目前,只有單一實作:RabbitTemplate。在後續的範例中,我們經常使用 AmqpTemplate。但是,當您查看組態範例或範本被實例化或調用 setter 的任何程式碼片段時,您可以看到實作類型(例如,RabbitTemplate)。

如先前所述,AmqpTemplate 介面定義了傳送和接收訊息的所有基本操作。我們將分別在 傳送訊息接收訊息 中探討訊息傳送和接收。

另請參閱 非同步 Rabbit 範本

新增重試能力

從 1.3 版開始,您現在可以設定 RabbitTemplate 以使用 RetryTemplate 來協助處理 Broker 連線問題。請參閱 spring-retry 專案以取得完整資訊。以下僅為一個範例,其使用指數退避原則和預設的 SimpleRetryPolicy,後者會在將例外拋給呼叫者之前嘗試三次。

以下範例使用 XML 命名空間

<rabbit:template id="template" connection-factory="connectionFactory" retry-template="retryTemplate"/>

<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
    <property name="backOffPolicy">
        <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
            <property name="initialInterval" value="500" />
            <property name="multiplier" value="10.0" />
            <property name="maxInterval" value="10000" />
        </bean>
    </property>
</bean>

以下範例在 Java 中使用 @Configuration 註解

@Bean
public RabbitTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory());
    RetryTemplate retryTemplate = new RetryTemplate();
    ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    backOffPolicy.setInitialInterval(500);
    backOffPolicy.setMultiplier(10.0);
    backOffPolicy.setMaxInterval(10000);
    retryTemplate.setBackOffPolicy(backOffPolicy);
    template.setRetryTemplate(retryTemplate);
    return template;
}

從 1.4 版開始,除了 retryTemplate 屬性之外,RabbitTemplate 也支援 recoveryCallback 選項。它用作 RetryTemplate.execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback) 的第二個引數。

RecoveryCallback 有些限制,因為重試內容僅包含 lastThrowable 欄位。對於更複雜的使用案例,您應該使用外部 RetryTemplate,以便您可以透過內容的屬性將其他資訊傳達給 RecoveryCallback。以下範例示範如何執行此操作
retryTemplate.execute(
    new RetryCallback<Object, Exception>() {

        @Override
        public Object doWithRetry(RetryContext context) throws Exception {
            context.setAttribute("message", message);
            return rabbitTemplate.convertAndSend(exchange, routingKey, message);
        }

    }, new RecoveryCallback<Object>() {

        @Override
        public Object recover(RetryContext context) throws Exception {
            Object message = context.getAttribute("message");
            Throwable t = context.getLastThrowable();
            // Do something with message
            return null;
        }
    });
}

在這種情況下,您**不會**將 RetryTemplate 注入到 RabbitTemplate 中。

發布是非同步的 — 如何偵測成功與失敗

發布訊息是一種非同步機制,預設情況下,無法路由的訊息會被 RabbitMQ 丟棄。對於成功的發布,您可以接收非同步確認,如 相關發布者確認和退回 中所述。考慮兩種失敗情況

  • 發布到交換器,但沒有相符的目的地佇列。

  • 發布到不存在的交換器。

第一種情況由發布者退回涵蓋,如 相關發布者確認和退回 中所述。

對於第二種情況,訊息會被丟棄,且不會產生任何退回。底層通道會因例外而關閉。預設情況下,此例外會被記錄,但您可以向 CachingConnectionFactory 註冊 ChannelListener 以取得此類事件的通知。以下範例示範如何新增 ConnectionListener

this.connectionFactory.addConnectionListener(new ConnectionListener() {

    @Override
    public void onCreate(Connection connection) {
    }

    @Override
    public void onShutDown(ShutdownSignalException signal) {
        ...
    }

});

您可以檢查訊號的 reason 屬性來判斷發生的問題。

若要在傳送執行緒上偵測例外,您可以對 RabbitTemplate 執行 setChannelTransacted(true),並且會在 txCommit() 上偵測到例外。但是,**交易會顯著降低效能**,因此在僅針對此單一使用案例啟用交易之前,請仔細考慮。

相關發布者確認和退回

AmqpTemplateRabbitTemplate 實作支援發布者確認和退回。

對於退回的訊息,範本的 mandatory 屬性必須設定為 true,或者 mandatory-expression 對於特定訊息必須評估為 true。此功能需要將其 publisherReturns 屬性設定為 trueCachingConnectionFactory(請參閱 發布者確認和退回)。退回會透過用戶端呼叫 setReturnsCallback(ReturnsCallback callback) 來註冊 RabbitTemplate.ReturnsCallback 而傳送至用戶端。回呼必須實作以下方法

void returnedMessage(ReturnedMessage returned);

ReturnedMessage 具有以下屬性

  • message - 退回的訊息本身

  • replyCode - 指示退回原因的程式碼

  • replyText - 退回的文字原因 - 例如 NO_ROUTE

  • exchange - 訊息傳送至的交換器

  • routingKey - 使用的路由金鑰

每個 RabbitTemplate 僅支援一個 ReturnsCallback。另請參閱 回覆逾時

對於發布者確認(也稱為發布者確認應答),範本需要將其 publisherConfirm 屬性設定為 ConfirmType.CORRELATEDCachingConnectionFactory。確認會透過用戶端呼叫 setConfirmCallback(ConfirmCallback callback) 來註冊 RabbitTemplate.ConfirmCallback 而傳送至用戶端。回呼必須實作此方法

void confirm(CorrelationData correlationData, boolean ack, String cause);

CorrelationData 是用戶端在傳送原始訊息時提供的物件。ack 對於 ack 為 true,對於 nack 為 false。對於 nack 實例,如果產生 nack 時原因可用,則原因可能包含 nack 的原因。範例是傳送訊息至不存在的交換器時。在這種情況下,Broker 會關閉通道。關閉的原因包含在 cause 中。cause 是在 1.4 版中新增的。

一個 RabbitTemplate 僅支援一個 ConfirmCallback

當 rabbit 範本傳送操作完成時,通道會關閉。當連線工廠快取已滿時(當快取中有空間時,通道不會實際關閉,且退回和確認會正常進行),這會排除接收確認或退回。當快取已滿時,架構會延遲關閉最多五秒,以便讓確認和退回有時間被接收。當使用確認時,通道會在收到最後一個確認時關閉。當僅使用退回時,通道會保持開啟五秒鐘。我們通常建議將連線工廠的 channelCacheSize 設定為足夠大的值,以便訊息發布所在的通道會傳回至快取,而不是被關閉。您可以使用 RabbitMQ 管理外掛程式來監控通道使用率。如果您看到通道快速開啟和關閉,您應該考慮增加快取大小以減少伺服器上的額外負荷。
在 2.1 版之前,針對發布者確認啟用的通道會在收到確認之前傳回至快取。某些其他程序可能會取出通道並執行某些操作,導致通道關閉 — 例如將訊息發布到不存在的交換器。這可能會導致確認遺失。2.1 版及更新版本在確認未完成時不再將通道傳回至快取。RabbitTemplate 會在每次操作後對通道執行邏輯 close()。一般而言,這表示在一個通道上一次只有一個確認未完成。
從 2.2 版開始,回呼會在連線工廠的 executor 執行緒之一上調用。這是為了避免當您從回呼中執行 Rabbit 操作時可能發生的死鎖。在先前的版本中,回呼會直接在 amqp-client 連線 I/O 執行緒上調用;如果您執行某些 RPC 操作(例如開啟新通道),這會導致死鎖,因為 I/O 執行緒會封鎖等待結果,但結果需要由 I/O 執行緒本身處理。在這些版本中,有必要將工作(例如傳送訊息)交給回呼中的另一個執行緒。由於架構現在將回呼調用交給執行器,因此不再需要這樣做。
在 ack 之前接收退回訊息的保證仍然維持,只要退回回呼在 60 秒或更短時間內執行即可。確認排定在退回回呼結束後或 60 秒後傳遞,以先到者為準。

CorrelationData 物件具有 CompletableFuture,您可以使用它來取得結果,而不是在範本上使用 ConfirmCallback。以下範例示範如何設定 CorrelationData 實例

CorrelationData cd1 = new CorrelationData();
this.templateWithConfirmsEnabled.convertAndSend("exchange", queue.getName(), "foo", cd1);
assertTrue(cd1.getFuture().get(10, TimeUnit.SECONDS).isAck());
ReturnedMessage = cd1.getReturn();
...

由於它是 CompletableFuture<Confirm>,因此您可以 get() 在準備就緒時取得結果,或使用 whenComplete() 進行非同步回呼。Confirm 物件是一個簡單的 bean,具有 2 個屬性:ackreason(適用於 nack 實例)。原因不會針對 Broker 產生的 nack 實例填入。它會針對架構產生的 nack 實例填入(例如,在 ack 實例未完成時關閉連線)。

此外,當同時啟用確認和退回時,如果 CorrelationData return 屬性無法路由到任何佇列,則會填入退回的訊息。保證在 future 設定為 ack 之前,退回的訊息屬性已設定。CorrelationData.getReturn() 傳回具有屬性的 ReturnMessage

  • message(退回的訊息)

  • replyCode

  • replyText

  • exchange

  • routingKey

另請參閱 範圍操作,以取得更簡單的機制來等待發布者確認。

範圍操作

通常,當使用範本時,會從快取中取出(或建立)Channel,用於操作,然後傳回至快取以供重複使用。在多執行緒環境中,無法保證下一個操作會使用相同的通道。但是,有時您可能想要更精確地控制通道的使用,並確保多個操作都在同一個通道上執行。

從 2.0 版開始,提供了一個名為 invoke 的新方法,搭配 OperationsCallback。在回呼範圍內以及在提供的 RabbitOperations 引數上執行的任何操作都會使用相同的專用 Channel,該通道將在結束時關閉(不會傳回至快取)。如果通道是 PublisherCallbackChannel,則會在收到所有確認後傳回至快取(請參閱 相關發布者確認和退回)。

@FunctionalInterface
public interface OperationsCallback<T> {

    T doInRabbit(RabbitOperations operations);

}

您可能需要此功能的其中一個範例是,如果您想要在底層 Channel 上使用 waitForConfirms() 方法。此方法先前未由 Spring API 公開,因為通道通常會被快取和共用,如先前所述。RabbitTemplate 現在提供 waitForConfirms(long timeout)waitForConfirmsOrDie(long timeout),它們委派給在 OperationsCallback 範圍內使用的專用通道。由於顯而易見的原因,這些方法無法在該範圍之外使用。

請注意,較高層級的抽象概念可讓您將確認與請求關聯,在其他地方提供(請參閱 相關發布者確認和退回)。如果您只想等待到 Broker 確認交付,則可以使用以下範例中顯示的技術

Collection<?> messages = getMessagesToSend();
Boolean result = this.template.invoke(t -> {
    messages.forEach(m -> t.convertAndSend(ROUTE, m));
    t.waitForConfirmsOrDie(10_000);
    return true;
});

如果您希望在 OperationsCallback 範圍內在同一個通道上調用 RabbitAdmin 操作,則管理員必須已使用與用於 invoke 操作的 RabbitTemplate 相同的範本建構。

如果範本操作已在現有交易的範圍內執行,則先前的討論是沒有意義的 — 例如,當在交易式監聽器容器執行緒上執行並在交易式範本上執行操作時。在這種情況下,操作會在該通道上執行,並在執行緒傳回至容器時提交。在這種情況下,不需要使用 invoke

當以這種方式使用確認時,為將確認與請求關聯而設定的大部分基礎架構並非真正需要(除非也啟用了退回)。從 2.2 版開始,連線工廠支援名為 publisherConfirmType 的新屬性。當此屬性設定為 ConfirmType.SIMPLE 時,會避免使用基礎架構,且確認處理可以更有效率。

此外,RabbitTemplate 會在傳送的訊息 MessageProperties 中設定 publisherSequenceNumber 屬性。如果您想要檢查(或記錄或以其他方式使用)特定確認,您可以使用多載的 invoke 方法來執行此操作,如下列範例所示

public <T> T invoke(OperationsCallback<T> action, com.rabbitmq.client.ConfirmCallback acks,
        com.rabbitmq.client.ConfirmCallback nacks);
這些 ConfirmCallback 物件(適用於 acknack 實例)是 Rabbit 用戶端回呼,而不是範本回呼。

以下範例記錄 acknack 實例

Collection<?> messages = getMessagesToSend();
Boolean result = this.template.invoke(t -> {
    messages.forEach(m -> t.convertAndSend(ROUTE, m));
    t.waitForConfirmsOrDie(10_000);
    return true;
}, (tag, multiple) -> {
        log.info("Ack: " + tag + ":" + multiple);
}, (tag, multiple) -> {
        log.info("Nack: " + tag + ":" + multiple);
}));
範圍操作會繫結至執行緒。請參閱 多執行緒環境中的嚴格訊息排序,以取得關於多執行緒環境中嚴格排序的討論。

多執行緒環境中的嚴格訊息排序

範圍操作 中的討論僅適用於在同一個執行緒上執行操作時。

考慮以下情況

  • thread-1 將訊息傳送至佇列,並將工作交給 thread-2

  • thread-2 將訊息傳送至相同的佇列

由於 RabbitMQ 的非同步性質以及快取通道的使用;無法確定是否會使用相同的通道,因此無法保證訊息到達佇列的順序。(在大多數情況下,它們會依序到達,但無序交付的可能性並非為零)。若要解決此使用案例,您可以使用大小為 1 的有界通道快取(以及 channelCheckoutTimeout)來確保訊息始終在同一個通道上發布,並且順序將得到保證。若要執行此操作,如果您對連線工廠有其他用途,例如消費者,則您應該為範本使用專用連線工廠,或設定範本以使用內嵌在主要連線工廠中的發布者連線工廠(請參閱 使用個別連線)。

使用簡單的 Spring Boot 應用程式最能說明這一點

@SpringBootApplication
public class Application {

	private static final Logger log = LoggerFactory.getLogger(Application.class);

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

	@Bean
	TaskExecutor exec() {
		ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
		exec.setCorePoolSize(10);
		return exec;
	}

	@Bean
	CachingConnectionFactory ccf() {
		CachingConnectionFactory ccf = new CachingConnectionFactory("localhost");
		CachingConnectionFactory publisherCF = (CachingConnectionFactory) ccf.getPublisherConnectionFactory();
		publisherCF.setChannelCacheSize(1);
		publisherCF.setChannelCheckoutTimeout(1000L);
		return ccf;
	}

	@RabbitListener(queues = "queue")
	void listen(String in) {
		log.info(in);
	}

	@Bean
	Queue queue() {
		return new Queue("queue");
	}


	@Bean
	public ApplicationRunner runner(Service service, TaskExecutor exec) {
		return args -> {
			exec.execute(() -> service.mainService("test"));
		};
	}

}

@Component
class Service {

	private static final Logger LOG = LoggerFactory.getLogger(Service.class);

	private final RabbitTemplate template;

	private final TaskExecutor exec;

	Service(RabbitTemplate template, TaskExecutor exec) {
		template.setUsePublisherConnection(true);
		this.template = template;
		this.exec = exec;
	}

	void mainService(String toSend) {
		LOG.info("Publishing from main service");
		this.template.convertAndSend("queue", toSend);
		this.exec.execute(() -> secondaryService(toSend.toUpperCase()));
	}

	void secondaryService(String toSend) {
		LOG.info("Publishing from secondary service");
		this.template.convertAndSend("queue", toSend);
	}

}

即使發布在兩個不同的執行緒上執行,它們也都會使用相同的通道,因為快取限制為單一通道。

從 2.3.7 版開始,ThreadChannelConnectionFactory 支援使用 prepareContextSwitchswitchContext 方法將執行緒的通道轉移到另一個執行緒。第一個方法傳回傳遞至第二個執行緒的內容,第二個執行緒呼叫第二個方法。一個執行緒可以具有繫結至它的非交易式通道或交易式通道(或各一個);您無法個別轉移它們,除非您使用兩個連線工廠。以下範例

@SpringBootApplication
public class Application {

	private static final Logger log = LoggerFactory.getLogger(Application.class);

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

	@Bean
	TaskExecutor exec() {
		ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
		exec.setCorePoolSize(10);
		return exec;
	}

	@Bean
	ThreadChannelConnectionFactory tccf() {
		ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
		rabbitConnectionFactory.setHost("localhost");
		return new ThreadChannelConnectionFactory(rabbitConnectionFactory);
	}

	@RabbitListener(queues = "queue")
	void listen(String in) {
		log.info(in);
	}

	@Bean
	Queue queue() {
		return new Queue("queue");
	}


	@Bean
	public ApplicationRunner runner(Service service, TaskExecutor exec) {
		return args -> {
			exec.execute(() -> service.mainService("test"));
		};
	}

}

@Component
class Service {

	private static final Logger LOG = LoggerFactory.getLogger(Service.class);

	private final RabbitTemplate template;

	private final TaskExecutor exec;

	private final ThreadChannelConnectionFactory connFactory;

	Service(RabbitTemplate template, TaskExecutor exec,
			ThreadChannelConnectionFactory tccf) {

		this.template = template;
		this.exec = exec;
		this.connFactory = tccf;
	}

	void mainService(String toSend) {
		LOG.info("Publishing from main service");
		this.template.convertAndSend("queue", toSend);
		Object context = this.connFactory.prepareSwitchContext();
		this.exec.execute(() -> secondaryService(toSend.toUpperCase(), context));
	}

	void secondaryService(String toSend, Object threadContext) {
		LOG.info("Publishing from secondary service");
		this.connFactory.switchContext(threadContext);
		this.template.convertAndSend("queue", toSend);
		this.connFactory.closeThreadChannel();
	}

}
一旦呼叫 prepareSwitchContext,如果目前執行緒執行任何更多操作,它們將在新通道上執行。不再需要執行緒繫結通道時,務必關閉它。

訊息傳遞整合

從 1.4 版開始,RabbitMessagingTemplate(建置於 RabbitTemplate 之上)提供與 Spring Framework 訊息傳遞抽象概念的整合 — 即 org.springframework.messaging.Message。這可讓您透過使用 spring-messaging Message<?> 抽象概念來傳送和接收訊息。此抽象概念由其他 Spring 專案使用,例如 Spring Integration 和 Spring 的 STOMP 支援。涉及兩個訊息轉換器:一個用於在 spring-messaging Message<?> 和 Spring AMQP 的 Message 抽象概念之間進行轉換,另一個用於在 Spring AMQP 的 Message 抽象概念和底層 RabbitMQ 用戶端程式庫所需的格式之間進行轉換。預設情況下,訊息酬載由提供的 RabbitTemplate 實例的訊息轉換器轉換。或者,您可以注入自訂 MessagingMessageConverter 以及其他一些酬載轉換器,如下列範例所示

MessagingMessageConverter amqpMessageConverter = new MessagingMessageConverter();
amqpMessageConverter.setPayloadConverter(myPayloadConverter);
rabbitMessagingTemplate.setAmqpMessageConverter(amqpMessageConverter);

已驗證的使用者 ID

從 1.6 版開始,範本現在支援 user-id-expression(使用 Java 設定時為 userIdExpression)。如果傳送訊息,則會在評估此運算式後設定使用者 ID 屬性(如果尚未設定)。評估的根物件是要傳送的訊息。

以下範例示範如何使用 user-id-expression 屬性

<rabbit:template ... user-id-expression="'guest'" />

<rabbit:template ... user-id-expression="@myConnectionFactory.username" />

第一個範例是常值運算式。第二個範例從應用程式內容中的連線工廠 bean 取得 username 屬性。

使用個別連線

從 2.0.2 版開始,您可以將 usePublisherConnection 屬性設定為 true,以便在可能的情況下使用與監聽器容器所用連線不同的連線。這是為了避免在生產者因任何原因而被封鎖時,消費者受到封鎖。連線工廠為此目的維護第二個內部連線工廠;預設情況下,它與主要工廠的類型相同,但如果您希望為發布使用不同的工廠類型,則可以明確設定。如果 rabbit 範本在監聽器容器啟動的交易中執行,則無論此設定為何,都會使用容器的通道。

一般而言,您不應將 RabbitAdmin 與將此設定為 true 的範本一起使用。使用接受連線工廠的 RabbitAdmin 建構子。如果您使用接受範本的其他建構子,請確保範本的屬性為 false。這是因為,管理員通常用於宣告監聽器容器的佇列。使用將屬性設定為 true 的範本表示獨佔佇列(例如 AnonymousQueue)將在與監聽器容器所用連線不同的連線上宣告。在這種情況下,容器無法使用佇列。