AmqpTemplate
如同 Spring Framework 和相關專案提供的許多其他高階抽象概念一樣,Spring AMQP 提供了一個扮演核心角色的「範本」。定義主要操作的介面稱為 AmqpTemplate
。這些操作涵蓋了傳送和接收訊息的一般行為。換句話說,它們並非任何實作獨有 — 因此名稱中包含「AMQP」。另一方面,該介面的一些實作與 AMQP 通訊協定的實作相關聯。與 JMS(本身就是介面層級 API)不同,AMQP 是一種線路層級通訊協定。該通訊協定的實作提供了它們自己的用戶端程式庫,因此範本介面的每個實作都依賴於特定的用戶端程式庫。目前,只有單一實作:RabbitTemplate
。在後續的範例中,我們經常使用 AmqpTemplate
。但是,當您查看組態範例或範本被實例化或調用 setter 的任何程式碼片段時,您可以看到實作類型(例如,RabbitTemplate
)。
另請參閱 非同步 Rabbit 範本。
新增重試能力
從 1.3 版開始,您現在可以設定 RabbitTemplate
以使用 RetryTemplate
來協助處理 Broker 連線問題。請參閱 spring-retry 專案以取得完整資訊。以下僅為一個範例,其使用指數退避原則和預設的 SimpleRetryPolicy
,後者會在將例外拋給呼叫者之前嘗試三次。
以下範例使用 XML 命名空間
<rabbit:template id="template" connection-factory="connectionFactory" retry-template="retryTemplate"/>
<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
<property name="backOffPolicy">
<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="500" />
<property name="multiplier" value="10.0" />
<property name="maxInterval" value="10000" />
</bean>
</property>
</bean>
以下範例在 Java 中使用 @Configuration
註解
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
RetryTemplate retryTemplate = new RetryTemplate();
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(500);
backOffPolicy.setMultiplier(10.0);
backOffPolicy.setMaxInterval(10000);
retryTemplate.setBackOffPolicy(backOffPolicy);
template.setRetryTemplate(retryTemplate);
return template;
}
從 1.4 版開始,除了 retryTemplate
屬性之外,RabbitTemplate
也支援 recoveryCallback
選項。它用作 RetryTemplate.execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback)
的第二個引數。
RecoveryCallback 有些限制,因為重試內容僅包含 lastThrowable 欄位。對於更複雜的使用案例,您應該使用外部 RetryTemplate ,以便您可以透過內容的屬性將其他資訊傳達給 RecoveryCallback 。以下範例示範如何執行此操作 |
retryTemplate.execute(
new RetryCallback<Object, Exception>() {
@Override
public Object doWithRetry(RetryContext context) throws Exception {
context.setAttribute("message", message);
return rabbitTemplate.convertAndSend(exchange, routingKey, message);
}
}, new RecoveryCallback<Object>() {
@Override
public Object recover(RetryContext context) throws Exception {
Object message = context.getAttribute("message");
Throwable t = context.getLastThrowable();
// Do something with message
return null;
}
});
}
在這種情況下,您**不會**將 RetryTemplate
注入到 RabbitTemplate
中。
發布是非同步的 — 如何偵測成功與失敗
發布訊息是一種非同步機制,預設情況下,無法路由的訊息會被 RabbitMQ 丟棄。對於成功的發布,您可以接收非同步確認,如 相關發布者確認和退回 中所述。考慮兩種失敗情況
-
發布到交換器,但沒有相符的目的地佇列。
-
發布到不存在的交換器。
第一種情況由發布者退回涵蓋,如 相關發布者確認和退回 中所述。
對於第二種情況,訊息會被丟棄,且不會產生任何退回。底層通道會因例外而關閉。預設情況下,此例外會被記錄,但您可以向 CachingConnectionFactory
註冊 ChannelListener
以取得此類事件的通知。以下範例示範如何新增 ConnectionListener
this.connectionFactory.addConnectionListener(new ConnectionListener() {
@Override
public void onCreate(Connection connection) {
}
@Override
public void onShutDown(ShutdownSignalException signal) {
...
}
});
您可以檢查訊號的 reason
屬性來判斷發生的問題。
若要在傳送執行緒上偵測例外,您可以對 RabbitTemplate
執行 setChannelTransacted(true)
,並且會在 txCommit()
上偵測到例外。但是,**交易會顯著降低效能**,因此在僅針對此單一使用案例啟用交易之前,請仔細考慮。
相關發布者確認和退回
AmqpTemplate
的 RabbitTemplate
實作支援發布者確認和退回。
對於退回的訊息,範本的 mandatory
屬性必須設定為 true
,或者 mandatory-expression
對於特定訊息必須評估為 true
。此功能需要將其 publisherReturns
屬性設定為 true
的 CachingConnectionFactory
(請參閱 發布者確認和退回)。退回會透過用戶端呼叫 setReturnsCallback(ReturnsCallback callback)
來註冊 RabbitTemplate.ReturnsCallback
而傳送至用戶端。回呼必須實作以下方法
void returnedMessage(ReturnedMessage returned);
ReturnedMessage
具有以下屬性
-
message
- 退回的訊息本身 -
replyCode
- 指示退回原因的程式碼 -
replyText
- 退回的文字原因 - 例如NO_ROUTE
-
exchange
- 訊息傳送至的交換器 -
routingKey
- 使用的路由金鑰
每個 RabbitTemplate
僅支援一個 ReturnsCallback
。另請參閱 回覆逾時。
對於發布者確認(也稱為發布者確認應答),範本需要將其 publisherConfirm
屬性設定為 ConfirmType.CORRELATED
的 CachingConnectionFactory
。確認會透過用戶端呼叫 setConfirmCallback(ConfirmCallback callback)
來註冊 RabbitTemplate.ConfirmCallback
而傳送至用戶端。回呼必須實作此方法
void confirm(CorrelationData correlationData, boolean ack, String cause);
CorrelationData
是用戶端在傳送原始訊息時提供的物件。ack
對於 ack
為 true,對於 nack
為 false。對於 nack
實例,如果產生 nack
時原因可用,則原因可能包含 nack
的原因。範例是傳送訊息至不存在的交換器時。在這種情況下,Broker 會關閉通道。關閉的原因包含在 cause
中。cause
是在 1.4 版中新增的。
一個 RabbitTemplate
僅支援一個 ConfirmCallback
。
當 rabbit 範本傳送操作完成時,通道會關閉。當連線工廠快取已滿時(當快取中有空間時,通道不會實際關閉,且退回和確認會正常進行),這會排除接收確認或退回。當快取已滿時,架構會延遲關閉最多五秒,以便讓確認和退回有時間被接收。當使用確認時,通道會在收到最後一個確認時關閉。當僅使用退回時,通道會保持開啟五秒鐘。我們通常建議將連線工廠的 channelCacheSize 設定為足夠大的值,以便訊息發布所在的通道會傳回至快取,而不是被關閉。您可以使用 RabbitMQ 管理外掛程式來監控通道使用率。如果您看到通道快速開啟和關閉,您應該考慮增加快取大小以減少伺服器上的額外負荷。 |
在 2.1 版之前,針對發布者確認啟用的通道會在收到確認之前傳回至快取。某些其他程序可能會取出通道並執行某些操作,導致通道關閉 — 例如將訊息發布到不存在的交換器。這可能會導致確認遺失。2.1 版及更新版本在確認未完成時不再將通道傳回至快取。RabbitTemplate 會在每次操作後對通道執行邏輯 close() 。一般而言,這表示在一個通道上一次只有一個確認未完成。 |
從 2.2 版開始,回呼會在連線工廠的 executor 執行緒之一上調用。這是為了避免當您從回呼中執行 Rabbit 操作時可能發生的死鎖。在先前的版本中,回呼會直接在 amqp-client 連線 I/O 執行緒上調用;如果您執行某些 RPC 操作(例如開啟新通道),這會導致死鎖,因為 I/O 執行緒會封鎖等待結果,但結果需要由 I/O 執行緒本身處理。在這些版本中,有必要將工作(例如傳送訊息)交給回呼中的另一個執行緒。由於架構現在將回呼調用交給執行器,因此不再需要這樣做。 |
在 ack 之前接收退回訊息的保證仍然維持,只要退回回呼在 60 秒或更短時間內執行即可。確認排定在退回回呼結束後或 60 秒後傳遞,以先到者為準。 |
CorrelationData
物件具有 CompletableFuture
,您可以使用它來取得結果,而不是在範本上使用 ConfirmCallback
。以下範例示範如何設定 CorrelationData
實例
CorrelationData cd1 = new CorrelationData();
this.templateWithConfirmsEnabled.convertAndSend("exchange", queue.getName(), "foo", cd1);
assertTrue(cd1.getFuture().get(10, TimeUnit.SECONDS).isAck());
ReturnedMessage = cd1.getReturn();
...
由於它是 CompletableFuture<Confirm>
,因此您可以 get()
在準備就緒時取得結果,或使用 whenComplete()
進行非同步回呼。Confirm
物件是一個簡單的 bean,具有 2 個屬性:ack
和 reason
(適用於 nack
實例)。原因不會針對 Broker 產生的 nack
實例填入。它會針對架構產生的 nack
實例填入(例如,在 ack
實例未完成時關閉連線)。
此外,當同時啟用確認和退回時,如果 CorrelationData
return
屬性無法路由到任何佇列,則會填入退回的訊息。保證在 future 設定為 ack
之前,退回的訊息屬性已設定。CorrelationData.getReturn()
傳回具有屬性的 ReturnMessage
-
message(退回的訊息)
-
replyCode
-
replyText
-
exchange
-
routingKey
另請參閱 範圍操作,以取得更簡單的機制來等待發布者確認。
範圍操作
通常,當使用範本時,會從快取中取出(或建立)Channel
,用於操作,然後傳回至快取以供重複使用。在多執行緒環境中,無法保證下一個操作會使用相同的通道。但是,有時您可能想要更精確地控制通道的使用,並確保多個操作都在同一個通道上執行。
從 2.0 版開始,提供了一個名為 invoke
的新方法,搭配 OperationsCallback
。在回呼範圍內以及在提供的 RabbitOperations
引數上執行的任何操作都會使用相同的專用 Channel
,該通道將在結束時關閉(不會傳回至快取)。如果通道是 PublisherCallbackChannel
,則會在收到所有確認後傳回至快取(請參閱 相關發布者確認和退回)。
@FunctionalInterface
public interface OperationsCallback<T> {
T doInRabbit(RabbitOperations operations);
}
您可能需要此功能的其中一個範例是,如果您想要在底層 Channel
上使用 waitForConfirms()
方法。此方法先前未由 Spring API 公開,因為通道通常會被快取和共用,如先前所述。RabbitTemplate
現在提供 waitForConfirms(long timeout)
和 waitForConfirmsOrDie(long timeout)
,它們委派給在 OperationsCallback
範圍內使用的專用通道。由於顯而易見的原因,這些方法無法在該範圍之外使用。
請注意,較高層級的抽象概念可讓您將確認與請求關聯,在其他地方提供(請參閱 相關發布者確認和退回)。如果您只想等待到 Broker 確認交付,則可以使用以下範例中顯示的技術
Collection<?> messages = getMessagesToSend();
Boolean result = this.template.invoke(t -> {
messages.forEach(m -> t.convertAndSend(ROUTE, m));
t.waitForConfirmsOrDie(10_000);
return true;
});
如果您希望在 OperationsCallback
範圍內在同一個通道上調用 RabbitAdmin
操作,則管理員必須已使用與用於 invoke
操作的 RabbitTemplate
相同的範本建構。
如果範本操作已在現有交易的範圍內執行,則先前的討論是沒有意義的 — 例如,當在交易式監聽器容器執行緒上執行並在交易式範本上執行操作時。在這種情況下,操作會在該通道上執行,並在執行緒傳回至容器時提交。在這種情況下,不需要使用 invoke 。 |
當以這種方式使用確認時,為將確認與請求關聯而設定的大部分基礎架構並非真正需要(除非也啟用了退回)。從 2.2 版開始,連線工廠支援名為 publisherConfirmType
的新屬性。當此屬性設定為 ConfirmType.SIMPLE
時,會避免使用基礎架構,且確認處理可以更有效率。
此外,RabbitTemplate
會在傳送的訊息 MessageProperties
中設定 publisherSequenceNumber
屬性。如果您想要檢查(或記錄或以其他方式使用)特定確認,您可以使用多載的 invoke
方法來執行此操作,如下列範例所示
public <T> T invoke(OperationsCallback<T> action, com.rabbitmq.client.ConfirmCallback acks,
com.rabbitmq.client.ConfirmCallback nacks);
這些 ConfirmCallback 物件(適用於 ack 和 nack 實例)是 Rabbit 用戶端回呼,而不是範本回呼。 |
以下範例記錄 ack
和 nack
實例
Collection<?> messages = getMessagesToSend();
Boolean result = this.template.invoke(t -> {
messages.forEach(m -> t.convertAndSend(ROUTE, m));
t.waitForConfirmsOrDie(10_000);
return true;
}, (tag, multiple) -> {
log.info("Ack: " + tag + ":" + multiple);
}, (tag, multiple) -> {
log.info("Nack: " + tag + ":" + multiple);
}));
範圍操作會繫結至執行緒。請參閱 多執行緒環境中的嚴格訊息排序,以取得關於多執行緒環境中嚴格排序的討論。 |
多執行緒環境中的嚴格訊息排序
範圍操作 中的討論僅適用於在同一個執行緒上執行操作時。
考慮以下情況
-
thread-1
將訊息傳送至佇列,並將工作交給thread-2
-
thread-2
將訊息傳送至相同的佇列
由於 RabbitMQ 的非同步性質以及快取通道的使用;無法確定是否會使用相同的通道,因此無法保證訊息到達佇列的順序。(在大多數情況下,它們會依序到達,但無序交付的可能性並非為零)。若要解決此使用案例,您可以使用大小為 1
的有界通道快取(以及 channelCheckoutTimeout
)來確保訊息始終在同一個通道上發布,並且順序將得到保證。若要執行此操作,如果您對連線工廠有其他用途,例如消費者,則您應該為範本使用專用連線工廠,或設定範本以使用內嵌在主要連線工廠中的發布者連線工廠(請參閱 使用個別連線)。
使用簡單的 Spring Boot 應用程式最能說明這一點
@SpringBootApplication
public class Application {
private static final Logger log = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
TaskExecutor exec() {
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(10);
return exec;
}
@Bean
CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory("localhost");
CachingConnectionFactory publisherCF = (CachingConnectionFactory) ccf.getPublisherConnectionFactory();
publisherCF.setChannelCacheSize(1);
publisherCF.setChannelCheckoutTimeout(1000L);
return ccf;
}
@RabbitListener(queues = "queue")
void listen(String in) {
log.info(in);
}
@Bean
Queue queue() {
return new Queue("queue");
}
@Bean
public ApplicationRunner runner(Service service, TaskExecutor exec) {
return args -> {
exec.execute(() -> service.mainService("test"));
};
}
}
@Component
class Service {
private static final Logger LOG = LoggerFactory.getLogger(Service.class);
private final RabbitTemplate template;
private final TaskExecutor exec;
Service(RabbitTemplate template, TaskExecutor exec) {
template.setUsePublisherConnection(true);
this.template = template;
this.exec = exec;
}
void mainService(String toSend) {
LOG.info("Publishing from main service");
this.template.convertAndSend("queue", toSend);
this.exec.execute(() -> secondaryService(toSend.toUpperCase()));
}
void secondaryService(String toSend) {
LOG.info("Publishing from secondary service");
this.template.convertAndSend("queue", toSend);
}
}
即使發布在兩個不同的執行緒上執行,它們也都會使用相同的通道,因為快取限制為單一通道。
從 2.3.7 版開始,ThreadChannelConnectionFactory
支援使用 prepareContextSwitch
和 switchContext
方法將執行緒的通道轉移到另一個執行緒。第一個方法傳回傳遞至第二個執行緒的內容,第二個執行緒呼叫第二個方法。一個執行緒可以具有繫結至它的非交易式通道或交易式通道(或各一個);您無法個別轉移它們,除非您使用兩個連線工廠。以下範例
@SpringBootApplication
public class Application {
private static final Logger log = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
TaskExecutor exec() {
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(10);
return exec;
}
@Bean
ThreadChannelConnectionFactory tccf() {
ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
rabbitConnectionFactory.setHost("localhost");
return new ThreadChannelConnectionFactory(rabbitConnectionFactory);
}
@RabbitListener(queues = "queue")
void listen(String in) {
log.info(in);
}
@Bean
Queue queue() {
return new Queue("queue");
}
@Bean
public ApplicationRunner runner(Service service, TaskExecutor exec) {
return args -> {
exec.execute(() -> service.mainService("test"));
};
}
}
@Component
class Service {
private static final Logger LOG = LoggerFactory.getLogger(Service.class);
private final RabbitTemplate template;
private final TaskExecutor exec;
private final ThreadChannelConnectionFactory connFactory;
Service(RabbitTemplate template, TaskExecutor exec,
ThreadChannelConnectionFactory tccf) {
this.template = template;
this.exec = exec;
this.connFactory = tccf;
}
void mainService(String toSend) {
LOG.info("Publishing from main service");
this.template.convertAndSend("queue", toSend);
Object context = this.connFactory.prepareSwitchContext();
this.exec.execute(() -> secondaryService(toSend.toUpperCase(), context));
}
void secondaryService(String toSend, Object threadContext) {
LOG.info("Publishing from secondary service");
this.connFactory.switchContext(threadContext);
this.template.convertAndSend("queue", toSend);
this.connFactory.closeThreadChannel();
}
}
一旦呼叫 prepareSwitchContext ,如果目前執行緒執行任何更多操作,它們將在新通道上執行。不再需要執行緒繫結通道時,務必關閉它。 |
訊息傳遞整合
從 1.4 版開始,RabbitMessagingTemplate
(建置於 RabbitTemplate
之上)提供與 Spring Framework 訊息傳遞抽象概念的整合 — 即 org.springframework.messaging.Message
。這可讓您透過使用 spring-messaging
Message<?>
抽象概念來傳送和接收訊息。此抽象概念由其他 Spring 專案使用,例如 Spring Integration 和 Spring 的 STOMP 支援。涉及兩個訊息轉換器:一個用於在 spring-messaging Message<?>
和 Spring AMQP 的 Message
抽象概念之間進行轉換,另一個用於在 Spring AMQP 的 Message
抽象概念和底層 RabbitMQ 用戶端程式庫所需的格式之間進行轉換。預設情況下,訊息酬載由提供的 RabbitTemplate
實例的訊息轉換器轉換。或者,您可以注入自訂 MessagingMessageConverter
以及其他一些酬載轉換器,如下列範例所示
MessagingMessageConverter amqpMessageConverter = new MessagingMessageConverter();
amqpMessageConverter.setPayloadConverter(myPayloadConverter);
rabbitMessagingTemplate.setAmqpMessageConverter(amqpMessageConverter);
已驗證的使用者 ID
從 1.6 版開始,範本現在支援 user-id-expression
(使用 Java 設定時為 userIdExpression
)。如果傳送訊息,則會在評估此運算式後設定使用者 ID 屬性(如果尚未設定)。評估的根物件是要傳送的訊息。
以下範例示範如何使用 user-id-expression
屬性
<rabbit:template ... user-id-expression="'guest'" />
<rabbit:template ... user-id-expression="@myConnectionFactory.username" />
第一個範例是常值運算式。第二個範例從應用程式內容中的連線工廠 bean 取得 username
屬性。
使用個別連線
從 2.0.2 版開始,您可以將 usePublisherConnection
屬性設定為 true
,以便在可能的情況下使用與監聽器容器所用連線不同的連線。這是為了避免在生產者因任何原因而被封鎖時,消費者受到封鎖。連線工廠為此目的維護第二個內部連線工廠;預設情況下,它與主要工廠的類型相同,但如果您希望為發布使用不同的工廠類型,則可以明確設定。如果 rabbit 範本在監聽器容器啟動的交易中執行,則無論此設定為何,都會使用容器的通道。
一般而言,您不應將 RabbitAdmin 與將此設定為 true 的範本一起使用。使用接受連線工廠的 RabbitAdmin 建構子。如果您使用接受範本的其他建構子,請確保範本的屬性為 false 。這是因為,管理員通常用於宣告監聽器容器的佇列。使用將屬性設定為 true 的範本表示獨佔佇列(例如 AnonymousQueue )將在與監聽器容器所用連線不同的連線上宣告。在這種情況下,容器無法使用佇列。 |